tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [13/50] [abbrv] Rename *.new* packages back to what they should be, remove dead code from the old packages - mapreduce module - tez-engine module (part of TEZ-398). (sseth)
Date Wed, 25 Sep 2013 07:31:20 GMT
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldInMemorySortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldInMemorySortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldInMemorySortedOutput.java
deleted file mode 100644
index 9ac92ba..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldInMemorySortedOutput.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.lib.oldoutput;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.common.sort.SortingOutput;
-import org.apache.tez.engine.records.OutputContext;
-
-/**
- * {@link OldInMemorySortedOutput} is an {@link Output} which sorts key/value pairs 
- * written to it and persists it to a file.
- */
-public class OldInMemorySortedOutput implements SortingOutput {
-  
-  public OldInMemorySortedOutput(TezEngineTaskContext task) throws IOException {
-  }
-  
-  public void initialize(Configuration conf, Master master) 
-      throws IOException, InterruptedException {
-  }
-
-  public void setTask(RunningTaskContext task) {
-  }
-  
-  public void write(Object key, Object value) throws IOException,
-      InterruptedException {
-  }
-
-  public void close() throws IOException, InterruptedException {
-  }
-
-  @Override
-  public OutputContext getOutputContext() {
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldLocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldLocalOnFileSorterOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldLocalOnFileSorterOutput.java
deleted file mode 100644
index b7f913c..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldLocalOnFileSorterOutput.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.lib.oldoutput;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tez.common.TezEngineTaskContext;
-
-public class OldLocalOnFileSorterOutput extends OldOnFileSortedOutput {
-
-  private static final Log LOG = LogFactory.getLog(OldLocalOnFileSorterOutput.class);
-
-  public OldLocalOnFileSorterOutput(TezEngineTaskContext task) throws IOException {
-    super(task);
-  }
-
-  @Override
-  public void close() throws IOException, InterruptedException {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldOnFileSortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldOnFileSortedOutput.java
deleted file mode 100644
index f259df9..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldOnFileSortedOutput.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.lib.oldoutput;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.common.sort.SortingOutput;
-import org.apache.tez.engine.records.OutputContext;
-
-/**
- * {@link OldOnFileSortedOutput} is an {@link Output} which sorts key/value pairs 
- * written to it and persists it to a file.
- */
-public class OldOnFileSortedOutput implements SortingOutput {
-
-  public OldOnFileSortedOutput(TezEngineTaskContext task) throws IOException {
-  }
-  
-  @Override
-  public void initialize(Configuration conf, Master master) 
-      throws IOException, InterruptedException {
-  }
-
-  @Override
-  public void setTask(RunningTaskContext task) {
-  }
-  
-  @Override
-  public void write(Object key, Object value) throws IOException,
-      InterruptedException {
-  }
-
-  @Override
-  public void close() throws IOException, InterruptedException {
-  }
-
-  @Override
-  public OutputContext getOutputContext() {
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
index 5d2a2ba..218aa21 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
@@ -22,9 +22,9 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.engine.api.KVWriter;
 import org.apache.tez.engine.common.sort.impl.dflt.InMemoryShuffleSorter;
 import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.KVWriter;
 import org.apache.tez.engine.newapi.LogicalOutput;
 import org.apache.tez.engine.newapi.Output;
 import org.apache.tez.engine.newapi.TezOutputContext;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
index d23ac1e..963276d 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
@@ -25,7 +25,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
 import org.apache.tez.engine.newapi.Event;
 
 public class LocalOnFileSorterOutput extends OnFileSortedOutput {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
index ffb36c5..7e0ca37 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
@@ -26,11 +26,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.engine.api.KVWriter;
 import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.apache.tez.engine.common.sort.impl.ExternalSorter;
 import org.apache.tez.engine.common.sort.impl.dflt.DefaultSorter;
 import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.KVWriter;
 import org.apache.tez.engine.newapi.LogicalOutput;
 import org.apache.tez.engine.newapi.TezOutputContext;
 import org.apache.tez.engine.newapi.events.DataMovementEvent;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java
index ec193c5..37edde8 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileUnorderedKVOutput.java
@@ -24,10 +24,10 @@ import java.util.List;
 
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.api.KVWriter;
 import org.apache.tez.engine.broadcast.output.FileBasedKVWriter;
 import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.KVWriter;
 import org.apache.tez.engine.newapi.LogicalOutput;
 import org.apache.tez.engine.newapi.TezOutputContext;
 import org.apache.tez.engine.newapi.events.DataMovementEvent;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
deleted file mode 100644
index 79615ce..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi;
-
-import java.io.IOException;
-
-/**
- * A key/value(s) pair based {@link Reader}.
- * 
- * Example usage
- * <code>
- * while (kvReader.next()) {
- *   KVRecord kvRecord = getCurrentKV();
- *   Object key =  kvRecord.getKey();
- *   Iterable values = kvRecord.getValues();
- * </code>
- *
- */
-public interface KVReader extends Reader {
-
-  /**
-   * Moves to the next key/values(s) pair
-   * 
-   * @return true if another key/value(s) pair exists, false if there are no more.
-   * @throws IOException
-   *           if an error occurs
-   */
-  public boolean next() throws IOException;
-
-  /**
-   * Return the current key/value(s) pair. Use moveToNext() to advance.
-   * @return
-   * @throws IOException
-   */
-  public KVRecord getCurrentKV() throws IOException;
-  
-  // TODO NEWTEZ Move this to getCurrentKey and getCurrentValue independently. Otherwise usage pattern seems to be getCurrentKV.getKey, getCurrentKV.getValue
-  
-  // TODO NEWTEZ KVRecord which does not need to return a list!
-  // TODO NEWTEZ Parameterize this
-  /**
-   * Represents a key and an associated set of values
-   *
-   */
-  public static class KVRecord {
-
-    private Object key;
-    private Iterable<Object> values;
-
-    public KVRecord(Object key, Iterable<Object> values) {
-      this.key = key;
-      this.values = values;
-    }
-
-    public Object getKey() {
-      return this.key;
-    }
-
-    public Iterable<Object> getValues() {
-      return this.values;
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java
deleted file mode 100644
index ad48912..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi;
-
-import java.io.IOException;
-
-/**
- * A key/value(s) pair based {@link Writer}
- */
-public interface KVWriter extends Writer {
-  /**
-   * Writes a key/value pair.
-   * 
-   * @param key
-   *          the key to write
-   * @param value
-   *          the value to write
-   * @throws IOException
-   *           if an error occurs
-   */
-  public void write(Object key, Object value) throws IOException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptCompletedEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptCompletedEvent.java
deleted file mode 100644
index d3a582d..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptCompletedEvent.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.events;
-
-import org.apache.tez.engine.newapi.Event;
-
-public class TaskAttemptCompletedEvent extends Event {
-
-  public TaskAttemptCompletedEvent() {
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptFailedEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptFailedEvent.java
deleted file mode 100644
index 772d7fe..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptFailedEvent.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.events;
-
-import org.apache.tez.engine.newapi.Event;
-
-public class TaskAttemptFailedEvent extends Event {
-
-  private final String diagnostics;
-
-  public TaskAttemptFailedEvent(String diagnostics) {
-    this.diagnostics = diagnostics;
-  }
-
-  public String getDiagnostics() {
-    return diagnostics;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskStatusUpdateEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskStatusUpdateEvent.java
deleted file mode 100644
index 0f09867..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskStatusUpdateEvent.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.events;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.engine.newapi.Event;
-
-public class TaskStatusUpdateEvent extends Event implements Writable {
-
-  private TezCounters tezCounters;
-  private float progress;
-
-  public TaskStatusUpdateEvent() {
-  }
-
-  public TaskStatusUpdateEvent(TezCounters tezCounters, float progress) {
-    this.tezCounters = tezCounters;
-    this.progress = progress;
-  }
-
-  public TezCounters getCounters() {
-    return tezCounters;
-  }
-
-  public float getProgress() {
-    return progress;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeFloat(progress);
-    if (tezCounters != null) {
-      out.writeBoolean(true);
-      tezCounters.write(out);
-    } else {
-      out.writeBoolean(false);
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    progress = in.readFloat();
-    if (in.readBoolean()) {
-      tezCounters = new TezCounters();
-      tezCounters.readFields(in);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
deleted file mode 100644
index 9faafc5..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-/**
- * Class that encapsulates all the information to identify the unique
- * object that either generated an Event or is the recipient of an Event.
- */
-public class EventMetaData implements Writable {
-
-  public static enum EventProducerConsumerType {
-    INPUT,
-    PROCESSOR,
-    OUTPUT,
-    SYSTEM
-  }
-
-  /**
-   * Producer Type ( one of Input/Output/Processor ) that generated the Event
-   * or Consumer Type that will consume the Event.
-   */
-  private EventProducerConsumerType producerConsumerType;
-
-  /**
-   * Name of the vertex where the event was generated.
-   */
-  private String taskVertexName;
-
-  /**
-   * Name of the vertex to which the Input or Output is connected to.
-   */
-  private String edgeVertexName;
-
-  /**
-   * i'th physical input/output that this event maps to.
-   */
-  private int index;
-
-  /**
-   * Task Attempt ID
-   */
-  private TezTaskAttemptID taskAttemptID;
-
-  public EventMetaData() {
-  }
-
-  public EventMetaData(EventProducerConsumerType generator,
-      String taskVertexName, String edgeVertexName,
-      TezTaskAttemptID taskAttemptID) {
-    this.producerConsumerType = generator;
-    this.taskVertexName = taskVertexName;
-    this.edgeVertexName = edgeVertexName;
-    this.taskAttemptID = taskAttemptID;
-  }
-
-  public EventProducerConsumerType getEventGenerator() {
-    return producerConsumerType;
-  }
-
-  public TezTaskAttemptID getTaskAttemptID() {
-    return taskAttemptID;
-  }
-
-  public String getTaskVertexName() {
-    return taskVertexName;
-  }
-
-  public String getEdgeVertexName() {
-    return edgeVertexName;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(producerConsumerType.ordinal());
-    if (taskVertexName != null) {
-      out.writeBoolean(true);
-      out.writeUTF(taskVertexName);
-    } else {
-      out.writeBoolean(false);
-    }
-    if (edgeVertexName != null) {
-      out.writeBoolean(true);
-      out.writeUTF(edgeVertexName);
-    } else {
-      out.writeBoolean(false);
-    }
-    if(taskAttemptID != null) {
-      out.writeBoolean(true);
-      taskAttemptID.write(out);
-    } else {
-      out.writeBoolean(false);
-    }
-    
-    out.writeInt(index);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    producerConsumerType = EventProducerConsumerType.values()[in.readInt()];
-    if (in.readBoolean()) {
-      taskVertexName = in.readUTF();
-    }
-    if (in.readBoolean()) {
-      edgeVertexName = in.readUTF();
-    }
-    if (in.readBoolean()) {
-      taskAttemptID = new TezTaskAttemptID();
-      taskAttemptID.readFields(in);
-    }
-    index = in.readInt();
-  }
-
-  public int getIndex() {
-    return index;
-  }
-
-  public void setIndex(int index) {
-    this.index = index;
-  }
-
-  @Override
-  public String toString() {
-    return "{ producerConsumerType=" + producerConsumerType
-        + ", taskVertexName=" + taskVertexName
-        + ", edgeVertexName=" + edgeVertexName
-        + ", taskAttemptId=" + taskAttemptID
-        + ", index=" + index + " }";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java
deleted file mode 100644
index 87d6665..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-public enum EventType {
-  TASK_ATTEMPT_COMPLETED_EVENT,
-  TASK_ATTEMPT_FAILED_EVENT,
-  DATA_MOVEMENT_EVENT,
-  INPUT_READ_ERROR_EVENT,
-  INPUT_FAILED_EVENT,
-  INTPUT_INFORMATION_EVENT,
-  TASK_STATUS_UPDATE_EVENT
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java
deleted file mode 100644
index a2b8cc8..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/InputSpec.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
-
-public class InputSpec implements Writable {
-
-  private String sourceVertexName;
-  private InputDescriptor inputDescriptor;
-  private int physicalEdgeCount;
-
-  public InputSpec() {
-  }
-
-  public InputSpec(String sourceVertexName, InputDescriptor inputDescriptor,
-      int physicalEdgeCount) {
-    this.sourceVertexName = sourceVertexName;
-    this.inputDescriptor = inputDescriptor;
-    this.physicalEdgeCount = physicalEdgeCount;
-  }
-
-  public String getSourceVertexName() {
-    return sourceVertexName;
-  }
-
-  public InputDescriptor getInputDescriptor() {
-    return inputDescriptor;
-  }
-
-  public int getPhysicalEdgeCount() {
-    return physicalEdgeCount;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    // TODONEWTEZ convert to PB
-    out.writeUTF(sourceVertexName);
-    out.writeInt(physicalEdgeCount);
-    byte[] inputDescBytes =
-        DagTypeConverters.convertToDAGPlan(inputDescriptor).toByteArray();
-    out.writeInt(inputDescBytes.length);
-    out.write(inputDescBytes);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    sourceVertexName = in.readUTF();
-    physicalEdgeCount = in.readInt();
-    int inputDescLen = in.readInt();
-    byte[] inputDescBytes = new byte[inputDescLen];
-    in.readFully(inputDescBytes);
-    inputDescriptor =
-        DagTypeConverters.convertInputDescriptorFromDAGPlan(
-            TezEntityDescriptorProto.parseFrom(inputDescBytes));
-  }
-
-  public String toString() {
-    return "{ sourceVertexName=" + sourceVertexName
-        + ", physicalEdgeCount" + physicalEdgeCount
-        + ", inputClassName=" + inputDescriptor.getClassName()
-        + " }";
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java
deleted file mode 100644
index 1b34ef0..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/OutputSpec.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
-
-public class OutputSpec implements Writable {
-
-  private String destinationVertexName;
-  private OutputDescriptor outputDescriptor;
-  private int physicalEdgeCount;
-
-  public OutputSpec() {
-  }
-
-  public OutputSpec(String destinationVertexName,
-      OutputDescriptor inputDescriptor, int physicalEdgeCount) {
-    this.destinationVertexName = destinationVertexName;
-    this.outputDescriptor = inputDescriptor;
-    this.physicalEdgeCount = physicalEdgeCount;
-  }
-
-  public String getDestinationVertexName() {
-    return destinationVertexName;
-  }
-
-  public OutputDescriptor getOutputDescriptor() {
-    return outputDescriptor;
-  }
-
-  public int getPhysicalEdgeCount() {
-    return physicalEdgeCount;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    // TODONEWTEZ convert to PB
-    out.writeUTF(destinationVertexName);
-    out.writeInt(physicalEdgeCount);
-    byte[] inputDescBytes =
-        DagTypeConverters.convertToDAGPlan(outputDescriptor).toByteArray();
-    out.writeInt(inputDescBytes.length);
-    out.write(inputDescBytes);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    destinationVertexName = in.readUTF();
-    physicalEdgeCount = in.readInt();
-    int inputDescLen = in.readInt();
-    byte[] inputDescBytes = new byte[inputDescLen];
-    in.readFully(inputDescBytes);
-    outputDescriptor =
-        DagTypeConverters.convertOutputDescriptorFromDAGPlan(
-            TezEntityDescriptorProto.parseFrom(inputDescBytes));
-  }
-
-  public String toString() {
-    return "{ destinationVertexName=" + destinationVertexName
-        + ", physicalEdgeCount" + physicalEdgeCount
-        + ", outputClassName=" + outputDescriptor.getClassName()
-        + " }";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java
deleted file mode 100644
index 8290e30..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TaskSpec.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-public class TaskSpec implements Writable {
-
-  private TezTaskAttemptID taskAttemptId;
-  private String vertexName;
-  private String user;
-  private ProcessorDescriptor processorDescriptor;
-  private List<InputSpec> inputSpecList;
-  private List<OutputSpec> outputSpecList;
-
-  public TaskSpec() {
-  }
-
-  // TODO NEWTEZ Remove user
-  public TaskSpec(TezTaskAttemptID taskAttemptID, String user,
-      String vertexName, ProcessorDescriptor processorDescriptor,
-      List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList) {
-    this.taskAttemptId = taskAttemptID;
-    this.vertexName = vertexName;
-    this.user = user;
-    this.processorDescriptor = processorDescriptor;
-    this.inputSpecList = inputSpecList;
-    this.outputSpecList = outputSpecList;
-  }
-
-  public String getVertexName() {
-    return vertexName;
-  }
-
-  public TezTaskAttemptID getTaskAttemptID() {
-    return taskAttemptId;
-  }
-
-  public String getUser() {
-    return user;
-  }
-
-  public ProcessorDescriptor getProcessorDescriptor() {
-    return processorDescriptor;
-  }
-
-  public List<InputSpec> getInputs() {
-    return inputSpecList;
-  }
-
-  public List<OutputSpec> getOutputs() {
-    return outputSpecList;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    taskAttemptId.write(out);
-    out.writeUTF(vertexName);
-    byte[] procDesc =
-        DagTypeConverters.convertToDAGPlan(processorDescriptor).toByteArray();
-    out.writeInt(procDesc.length);
-    out.write(procDesc);
-    out.writeInt(inputSpecList.size());
-    for (InputSpec inputSpec : inputSpecList) {
-      inputSpec.write(out);
-    }
-    out.writeInt(outputSpecList.size());
-    for (OutputSpec outputSpec : outputSpecList) {
-      outputSpec.write(out);
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    taskAttemptId = new TezTaskAttemptID();
-    taskAttemptId.readFields(in);
-    vertexName = in.readUTF();
-    int procDescLength = in.readInt();
-    // TODO at least 3 buffer copies here. Need to convert this to full PB
-    // TEZ-305
-    byte[] procDescBytes = new byte[procDescLength];
-    in.readFully(procDescBytes);
-    processorDescriptor =
-        DagTypeConverters.convertProcessorDescriptorFromDAGPlan(
-            TezEntityDescriptorProto.parseFrom(procDescBytes));
-    int numInputSpecs = in.readInt();
-    inputSpecList = new ArrayList<InputSpec>(numInputSpecs);
-    for (int i = 0; i < numInputSpecs; i++) {
-      InputSpec inputSpec = new InputSpec();
-      inputSpec.readFields(in);
-      inputSpecList.add(inputSpec);
-    }
-    int numOutputSpecs = in.readInt();
-    outputSpecList = new ArrayList<OutputSpec>(numOutputSpecs);
-    for (int i = 0; i < numOutputSpecs; i++) {
-      OutputSpec outputSpec = new OutputSpec();
-      outputSpec.readFields(in);
-      outputSpecList.add(outputSpec);
-    }
-  }
-
-  @Override
-  public String toString() {
-    StringBuffer sb = new StringBuffer();
-    sb.append("TaskAttemptID:" + taskAttemptId);
-    sb.append("processorName=" + processorDescriptor.getClassName()
-        + ", inputSpecListSize=" + inputSpecList.size()
-        + ", outputSpecListSize=" + outputSpecList.size());
-    sb.append(", inputSpecList=[");
-    for (InputSpec i : inputSpecList) {
-      sb.append("{" + i.toString() + "}, ");
-    }
-    sb.append("], outputSpecList=[");
-    for (OutputSpec i : outputSpecList) {
-      sb.append("{" + i.toString() + "}, ");
-    }
-    sb.append("]");
-    return sb.toString();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
deleted file mode 100644
index 0f65750..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.engine.api.events.EventProtos.DataMovementEventProto;
-import org.apache.tez.engine.api.events.EventProtos.InputFailedEventProto;
-import org.apache.tez.engine.api.events.EventProtos.InputInformationEventProto;
-import org.apache.tez.engine.api.events.EventProtos.InputReadErrorEventProto;
-import org.apache.tez.engine.api.events.SystemEventProtos.TaskAttemptCompletedEventProto;
-import org.apache.tez.engine.api.events.SystemEventProtos.TaskAttemptFailedEventProto;
-import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.events.DataMovementEvent;
-import org.apache.tez.engine.newapi.events.InputFailedEvent;
-import org.apache.tez.engine.newapi.events.InputInformationEvent;
-import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
-import org.apache.tez.engine.newapi.events.TaskAttemptCompletedEvent;
-import org.apache.tez.engine.newapi.events.TaskAttemptFailedEvent;
-import org.apache.tez.engine.newapi.events.TaskStatusUpdateEvent;
-
-import com.google.protobuf.ByteString;
-
-public class TezEvent implements Writable {
-
-  private EventType eventType;
-
-  private Event event;
-
-  private EventMetaData sourceInfo;
-
-  private EventMetaData destinationInfo;
-
-  public TezEvent() {
-  }
-
-  public TezEvent(Event event, EventMetaData sourceInfo) {
-    this.event = event;
-    this.setSourceInfo(sourceInfo);
-    if (event instanceof DataMovementEvent) {
-      eventType = EventType.DATA_MOVEMENT_EVENT;
-    } else if (event instanceof InputReadErrorEvent) {
-      eventType = EventType.INPUT_READ_ERROR_EVENT;
-    } else if (event instanceof TaskAttemptFailedEvent) {
-      eventType = EventType.TASK_ATTEMPT_FAILED_EVENT;
-    } else if (event instanceof TaskAttemptCompletedEvent) {
-      eventType = EventType.TASK_ATTEMPT_COMPLETED_EVENT;
-    } else if (event instanceof InputInformationEvent) {
-      eventType = EventType.INTPUT_INFORMATION_EVENT;
-    } else if (event instanceof InputFailedEvent) {
-      eventType = EventType.INPUT_FAILED_EVENT;
-    } else if (event instanceof TaskStatusUpdateEvent) {
-      eventType = EventType.TASK_STATUS_UPDATE_EVENT;
-    } else {
-      throw new TezUncheckedException("Unknown event, event="
-          + event.getClass().getName());
-    }
-  }
-
-  public Event getEvent() {
-    return event;
-  }
-
-  public EventMetaData getSourceInfo() {
-    return sourceInfo;
-  }
-
-  public void setSourceInfo(EventMetaData sourceInfo) {
-    this.sourceInfo = sourceInfo;
-  }
-
-  public EventMetaData getDestinationInfo() {
-    return destinationInfo;
-  }
-
-  public void setDestinationInfo(EventMetaData destinationInfo) {
-    this.destinationInfo = destinationInfo;
-  }
-
-  public EventType getEventType() {
-    return eventType;
-  }
-
-  private void serializeEvent(DataOutput out) throws IOException {
-    if (event == null) {
-      out.writeBoolean(false);
-      return;
-    }
-    out.writeBoolean(true);
-    out.writeInt(eventType.ordinal());
-    if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
-      // TODO NEWTEZ convert to PB
-      TaskStatusUpdateEvent sEvt = (TaskStatusUpdateEvent) event;
-      sEvt.write(out);
-    } else {
-      byte[] eventBytes = null;
-      switch (eventType) {
-      case DATA_MOVEMENT_EVENT:
-        DataMovementEvent dmEvt = (DataMovementEvent) event;
-        eventBytes = DataMovementEventProto.newBuilder()
-          .setSourceIndex(dmEvt.getSourceIndex())
-          .setTargetIndex(dmEvt.getTargetIndex())
-          .setUserPayload(ByteString.copyFrom(dmEvt.getUserPayload()))
-          .build().toByteArray();
-        break;
-      case INPUT_READ_ERROR_EVENT:
-        InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
-        eventBytes = InputReadErrorEventProto.newBuilder()
-            .setIndex(ideEvt.getIndex())
-            .setDiagnostics(ideEvt.getDiagnostics())
-            .build().toByteArray();
-        break;
-      case TASK_ATTEMPT_FAILED_EVENT:
-        TaskAttemptFailedEvent tfEvt = (TaskAttemptFailedEvent) event;
-        eventBytes = TaskAttemptFailedEventProto.newBuilder()
-            .setDiagnostics(tfEvt.getDiagnostics())
-            .build().toByteArray();
-        break;
-      case TASK_ATTEMPT_COMPLETED_EVENT:
-        eventBytes = TaskAttemptCompletedEventProto.newBuilder()
-            .build().toByteArray();
-        break;
-      case INPUT_FAILED_EVENT:
-        InputFailedEvent ifEvt = (InputFailedEvent) event;
-        eventBytes = InputFailedEventProto.newBuilder()
-            .setSourceIndex(ifEvt.getSourceIndex())
-            .setTargetIndex(ifEvt.getTargetIndex())
-            .setVersion(ifEvt.getVersion()).build().toByteArray();
-      case INTPUT_INFORMATION_EVENT:
-        InputInformationEvent iEvt = (InputInformationEvent) event;
-        eventBytes = InputInformationEventProto.newBuilder()
-            .setUserPayload(ByteString.copyFrom(iEvt.getUserPayload()))
-            .build().toByteArray();
-      default:
-        throw new TezUncheckedException("Unknown TezEvent"
-           + ", type=" + eventType);
-      }
-      out.writeInt(eventBytes.length);
-      out.write(eventBytes);
-    }
-  }
-
-  private void deserializeEvent(DataInput in) throws IOException {
-    if (!in.readBoolean()) {
-      event = null;
-      return;
-    }
-    eventType = EventType.values()[in.readInt()];
-    if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
-      // TODO NEWTEZ convert to PB
-      event = new TaskStatusUpdateEvent();
-      ((TaskStatusUpdateEvent)event).readFields(in);
-    } else {
-      int eventBytesLen = in.readInt();
-      byte[] eventBytes = new byte[eventBytesLen];
-      in.readFully(eventBytes);
-      switch (eventType) {
-      case DATA_MOVEMENT_EVENT:
-        DataMovementEventProto dmProto =
-            DataMovementEventProto.parseFrom(eventBytes);
-        event = new DataMovementEvent(dmProto.getSourceIndex(),
-            dmProto.getTargetIndex(),
-            dmProto.getUserPayload().toByteArray());
-        break;
-      case INPUT_READ_ERROR_EVENT:
-        InputReadErrorEventProto ideProto =
-            InputReadErrorEventProto.parseFrom(eventBytes);
-        event = new InputReadErrorEvent(ideProto.getDiagnostics(),
-            ideProto.getIndex(), ideProto.getVersion());
-        break;
-      case TASK_ATTEMPT_FAILED_EVENT:
-        TaskAttemptFailedEventProto tfProto =
-            TaskAttemptFailedEventProto.parseFrom(eventBytes);
-        event = new TaskAttemptFailedEvent(tfProto.getDiagnostics());
-        break;
-      case TASK_ATTEMPT_COMPLETED_EVENT:
-        event = new TaskAttemptCompletedEvent();
-        break;
-      case INPUT_FAILED_EVENT:
-        InputFailedEventProto ifProto =
-            InputFailedEventProto.parseFrom(eventBytes);
-        event = new InputFailedEvent(ifProto.getSourceIndex(),
-            ifProto.getTargetIndex(), ifProto.getVersion());
-        break;
-      case INTPUT_INFORMATION_EVENT:
-        InputInformationEventProto infoProto =
-            InputInformationEventProto.parseFrom(eventBytes);
-        event = new InputInformationEvent(
-            infoProto.getUserPayload().toByteArray());
-        break;
-      default:
-        throw new TezUncheckedException("Unknown TezEvent"
-           + ", type=" + eventType);
-      }
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    serializeEvent(out);
-    if (sourceInfo != null) {
-      out.writeBoolean(true);
-      sourceInfo.write(out);
-    } else {
-      out.writeBoolean(false);
-    }
-    if (destinationInfo != null) {
-      out.writeBoolean(true);
-      destinationInfo.write(out);
-    } else {
-      out.writeBoolean(false);
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    deserializeEvent(in);
-    if (in.readBoolean()) {
-      sourceInfo = new EventMetaData();
-      sourceInfo.readFields(in);
-    }
-    if (in.readBoolean()) {
-      destinationInfo = new EventMetaData();
-      destinationInfo.readFields(in);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
deleted file mode 100644
index 79a0968..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-
-public class TezHeartbeatRequest implements Writable {
-
-  private String containerIdentifier;
-  private List<TezEvent> events;
-  private TezTaskAttemptID currentTaskAttemptID;
-  private int startIndex;
-  private int maxEvents;
-  private long requestId;
-
-  public TezHeartbeatRequest() {
-  }
-
-  public TezHeartbeatRequest(long requestId, List<TezEvent> events,
-      String containerIdentifier, TezTaskAttemptID taskAttemptID,
-      int startIndex, int maxEvents) {
-    this.containerIdentifier = containerIdentifier;
-    this.requestId = requestId;
-    this.events = Collections.unmodifiableList(events);
-    this.startIndex = startIndex;
-    this.maxEvents = maxEvents;
-    this.currentTaskAttemptID = taskAttemptID;
-  }
-
-  public String getContainerIdentifier() {
-    return containerIdentifier;
-  }
-
-  public List<TezEvent> getEvents() {
-    return events;
-  }
-
-  public int getStartIndex() {
-    return startIndex;
-  }
-
-  public int getMaxEvents() {
-    return maxEvents;
-  }
-
-  public long getRequestId() {
-    return requestId;
-  }
-
-  public TezTaskAttemptID getCurrentTaskAttemptID() {
-    return currentTaskAttemptID;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    if (events != null) {
-      out.writeBoolean(true);
-      out.writeInt(events.size());
-      for (TezEvent e : events) {
-        e.write(out);
-      }
-    } else {
-      out.writeBoolean(false);
-    }
-    if (currentTaskAttemptID != null) {
-      out.writeBoolean(true);
-      currentTaskAttemptID.write(out);
-    } else {
-      out.writeBoolean(false);
-    }
-    out.writeInt(startIndex);
-    out.writeInt(maxEvents);
-    out.writeLong(requestId);
-    Text.writeString(out, containerIdentifier);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    if (in.readBoolean()) {
-      int eventsCount = in.readInt();
-      events = new ArrayList<TezEvent>(eventsCount);
-      for (int i = 0; i < eventsCount; ++i) {
-        TezEvent e = new TezEvent();
-        e.readFields(in);
-        events.add(e);
-      }
-    }
-    if (in.readBoolean()) {
-      currentTaskAttemptID = new TezTaskAttemptID();
-      currentTaskAttemptID.readFields(in);
-    } else {
-      currentTaskAttemptID = null;
-    }
-    startIndex = in.readInt();
-    maxEvents = in.readInt();
-    requestId = in.readLong();
-    containerIdentifier = Text.readString(in);
-  }
-
-  @Override
-  public String toString() {
-    return "{ "
-        + " containerId=" + containerIdentifier
-        + ", requestId=" + requestId
-        + ", startIndex=" + startIndex
-        + ", maxEventsToGet=" + maxEvents
-        + ", taskAttemptId" + currentTaskAttemptID
-        + ", eventCount=" + (events != null ? events.size() : 0)
-        + " }";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
deleted file mode 100644
index addd17f..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
-
-public class TezHeartbeatResponse implements Writable {
-
-  private long lastRequestId;
-  private boolean shouldDie = false;
-  private List<TezEvent> events;
-
-  public TezHeartbeatResponse() {
-  }
-
-  public TezHeartbeatResponse(List<TezEvent> events) {
-    this.events = Collections.unmodifiableList(events);
-  }
-
-  public List<TezEvent> getEvents() {
-    return events;
-  }
-
-  public boolean shouldDie() {
-    return shouldDie;
-  }
-
-  public long getLastRequestId() {
-    return lastRequestId;
-  }
-
-  public void setEvents(List<TezEvent> events) {
-    this.events = Collections.unmodifiableList(events);
-  }
-
-  public void setLastRequestId(long lastRequestId ) {
-    this.lastRequestId = lastRequestId;
-  }
-
-  public void setShouldDie() {
-    this.shouldDie = true;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeLong(lastRequestId);
-    out.writeBoolean(shouldDie);
-    if(events != null) {
-      out.writeBoolean(true);
-      out.writeInt(events.size());
-      for (TezEvent e : events) {
-        e.write(out);
-      }
-    } else {
-      out.writeBoolean(false);
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    lastRequestId = in.readLong();
-    shouldDie = in.readBoolean();
-    if(in.readBoolean()) {
-      int eventCount = in.readInt();
-      events = new ArrayList<TezEvent>(eventCount);
-      for (int i = 0; i < eventCount; ++i) {
-        TezEvent e = new TezEvent();
-        e.readFields(in);
-        events.add(e);
-      }
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "{ "
-        + " lastRequestId=" + lastRequestId
-        + ", shouldDie=" + shouldDie
-        + ", eventCount=" + (events != null ? events.size() : 0)
-        + " }";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
deleted file mode 100644
index daafc5a..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.TezInputContext;
-import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.newruntime.RuntimeTask;
-
-public class TezInputContextImpl extends TezTaskContextImpl
-    implements TezInputContext {
-
-  private final byte[] userPayload;
-  private final String sourceVertexName;
-  private final EventMetaData sourceInfo;
-
-  @Private
-  public TezInputContextImpl(Configuration conf, int appAttemptNumber,
-      TezUmbilical tezUmbilical, String taskVertexName,
-      String sourceVertexName, TezTaskAttemptID taskAttemptID,
-      TezCounters counters, byte[] userPayload,
-      RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata) {
-    super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
-        counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
-    this.userPayload = userPayload;
-    this.sourceVertexName = sourceVertexName;
-    this.sourceInfo = new EventMetaData(
-        EventProducerConsumerType.INPUT, taskVertexName, sourceVertexName,
-        taskAttemptID);
-    this.uniqueIdentifier = String.format("%s_%s_%06d_%02d_%s", taskAttemptID
-        .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
-        getTaskIndex(), getTaskAttemptNumber(), sourceVertexName);
-  }
-
-  @Override
-  public void sendEvents(List<Event> events) {
-    List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
-    for (Event e : events) {
-      TezEvent tEvt = new TezEvent(e, sourceInfo);
-      tezEvents.add(tEvt);
-    }
-    tezUmbilical.addEvents(tezEvents);
-  }
-
-  @Override
-  public byte[] getUserPayload() {
-    return userPayload;
-  }
-
-  @Override
-  public String getSourceVertexName() {
-    return sourceVertexName;
-  }
-
-  @Override
-  public void fatalError(Throwable exception, String message) {
-    super.signalFatalError(exception, message, sourceInfo);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
deleted file mode 100644
index 9de41ae..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.TezOutputContext;
-import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.newruntime.RuntimeTask;
-
-public class TezOutputContextImpl extends TezTaskContextImpl
-    implements TezOutputContext {
-
-  private final byte[] userPayload;
-  private final String destinationVertexName;
-  private final EventMetaData sourceInfo;
-
-  @Private
-  public TezOutputContextImpl(Configuration conf, int appAttemptNumber,
-      TezUmbilical tezUmbilical, String taskVertexName,
-      String destinationVertexName,
-      TezTaskAttemptID taskAttemptID, TezCounters counters,
-      byte[] userPayload, RuntimeTask runtimeTask,
-      Map<String, ByteBuffer> serviceConsumerMetadata) {
-    super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
-        counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
-    this.userPayload = userPayload;
-    this.destinationVertexName = destinationVertexName;
-    this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
-        taskVertexName, destinationVertexName, taskAttemptID);
-    this.uniqueIdentifier = String.format("%s_%s_%06d_%02d_%s", taskAttemptID
-        .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
-        getTaskIndex(), getTaskAttemptNumber(), destinationVertexName);
-  }
-
-  @Override
-  public void sendEvents(List<Event> events) {
-    List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
-    for (Event e : events) {
-      TezEvent tEvt = new TezEvent(e, sourceInfo);
-      tezEvents.add(tEvt);
-    }
-    tezUmbilical.addEvents(tezEvents);
-  }
-
-  @Override
-  public byte[] getUserPayload() {
-    return userPayload;
-  }
-
-  @Override
-  public String getDestinationVertexName() {
-    return destinationVertexName;
-  }
-
-  @Override
-  public void fatalError(Throwable exception, String message) {
-    super.signalFatalError(exception, message, sourceInfo);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
deleted file mode 100644
index d710f7a..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.nio.ByteBuffer;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.TezProcessorContext;
-import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.newruntime.RuntimeTask;
-
-public class TezProcessorContextImpl extends TezTaskContextImpl
-  implements TezProcessorContext {
-
-  private final byte[] userPayload;
-  private final EventMetaData sourceInfo;
-
-  public TezProcessorContextImpl(Configuration conf, int appAttemptNumber,
-      TezUmbilical tezUmbilical, String vertexName,
-      TezTaskAttemptID taskAttemptID, TezCounters counters,
-      byte[] userPayload, RuntimeTask runtimeTask,
-      Map<String, ByteBuffer> serviceConsumerMetadata) {
-    super(conf, appAttemptNumber, vertexName, taskAttemptID,
-        counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
-    this.userPayload = userPayload;
-    this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
-        taskVertexName, "", taskAttemptID);
-    this.uniqueIdentifier = String.format("%s_%s_%06d_%02d", taskAttemptID
-        .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
-        getTaskIndex(), getTaskAttemptNumber());
-  }
-
-  @Override
-  public void sendEvents(List<Event> events) {
-    List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
-    for (Event e : events) {
-      TezEvent tEvt = new TezEvent(e, sourceInfo);
-      tezEvents.add(tEvt);
-    }
-    tezUmbilical.addEvents(tezEvents);
-  }
-
-  @Override
-  public byte[] getUserPayload() {
-    return userPayload;
-  }
-
-  @Override
-  public void setProgress(float progress) {
-    runtimeTask.setProgress(progress);
-  }
-
-  @Override
-  public void fatalError(Throwable exception, String message) {
-    super.signalFatalError(exception, message, sourceInfo);
-  }
-
-  @Override
-  public boolean canCommit() throws IOException {
-    return tezUmbilical.canCommit(this.taskAttemptID);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
deleted file mode 100644
index 1d17158..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.TezTaskContext;
-import org.apache.tez.engine.newruntime.RuntimeTask;
-
-public abstract class TezTaskContextImpl implements TezTaskContext {
-
-  private final Configuration conf;
-  protected final String taskVertexName;
-  protected final TezTaskAttemptID taskAttemptID;
-  private final TezCounters counters;
-  private String[] workDirs;
-  protected String uniqueIdentifier;
-  protected final RuntimeTask runtimeTask;
-  protected final TezUmbilical tezUmbilical;
-  private final Map<String, ByteBuffer> serviceConsumerMetadata;
-  private final int appAttemptNumber;
-
-  @Private
-  public TezTaskContextImpl(Configuration conf, int appAttemptNumber,
-      String taskVertexName, TezTaskAttemptID taskAttemptID,
-      TezCounters counters, RuntimeTask runtimeTask,
-      TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata) {
-    this.conf = conf;
-    this.taskVertexName = taskVertexName;
-    this.taskAttemptID = taskAttemptID;
-    this.counters = counters;
-    // TODO Maybe change this to be task id specific at some point. For now
-    // Shuffle code relies on this being a path specified by YARN
-    this.workDirs = this.conf.getStrings(TezJobConfig.LOCAL_DIRS);
-    this.runtimeTask = runtimeTask;
-    this.tezUmbilical = tezUmbilical;
-    this.serviceConsumerMetadata = serviceConsumerMetadata;
-    // TODO NEWTEZ at some point dag attempt should not map to app attempt
-    this.appAttemptNumber = appAttemptNumber;
-  }
-
-  @Override
-  public ApplicationId getApplicationId() {
-    return taskAttemptID.getTaskID().getVertexID().getDAGId()
-        .getApplicationId();
-  }
-
-  @Override
-  public int getTaskIndex() {
-    return taskAttemptID.getTaskID().getId();
-  }
-
-  @Override
-  public int getDAGAttemptNumber() {
-    return appAttemptNumber;
-  }
-
-  @Override
-  public int getTaskAttemptNumber() {
-    return taskAttemptID.getId();
-  }
-
-  @Override
-  public String getDAGName() {
-    // TODO NEWTEZ Change to some form of the DAG name, for now using dagId as
-    // the unique identifier.
-    return taskAttemptID.getTaskID().getVertexID().getDAGId().toString();
-  }
-
-  @Override
-  public String getTaskVertexName() {
-    return taskVertexName;
-  }
-
-
-  @Override
-  public TezCounters getCounters() {
-    return counters;
-  }
-
-  @Override
-  public String[] getWorkDirs() {
-    return Arrays.copyOf(workDirs, workDirs.length);
-  }
-
-  @Override
-  public String getUniqueIdentifier() {
-    return uniqueIdentifier;
-  }
-
-  @Override
-  public ByteBuffer getServiceConsumerMetaData(String serviceName) {
-    return (ByteBuffer) serviceConsumerMetadata.get(serviceName)
-        .asReadOnlyBuffer().rewind();
-  }
-
-  @Override
-  public ByteBuffer getServiceProviderMetaData(String serviceName) {
-    return AuxiliaryServiceHelper.getServiceDataFromEnv(
-        serviceName, System.getenv());
-  }
-
-  protected void signalFatalError(Throwable t, String message,
-      EventMetaData sourceInfo) {
-    runtimeTask.setFatalError(t, message);
-    String diagnostics;
-    if (t != null && message != null) {
-      diagnostics = "exceptionThrown=" + StringUtils.stringifyException(t)
-          + ", errorMessage=" + message;
-    } else if (t == null && message == null) {
-      diagnostics = "Unknown error";
-    } else {
-      diagnostics = t != null ?
-          "exceptionThrown=" + StringUtils.stringifyException(t)
-          : " errorMessage=" + message;
-    }
-    tezUmbilical.signalFatalError(taskAttemptID, diagnostics, sourceInfo);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
deleted file mode 100644
index 5889622..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-public interface TezUmbilical {
-
-  public void addEvents(Collection<TezEvent> events);
-
-  public void signalFatalError(TezTaskAttemptID taskAttemptID,
-      String diagnostics,
-      EventMetaData sourceInfo);
-
-  public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index eb055b6..77299de 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -38,6 +38,16 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.api.impl.EventMetaData;
+import org.apache.tez.engine.api.impl.InputSpec;
+import org.apache.tez.engine.api.impl.OutputSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TezEvent;
+import org.apache.tez.engine.api.impl.TezInputContextImpl;
+import org.apache.tez.engine.api.impl.TezOutputContextImpl;
+import org.apache.tez.engine.api.impl.TezProcessorContextImpl;
+import org.apache.tez.engine.api.impl.TezUmbilical;
+import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.newapi.Event;
 import org.apache.tez.engine.newapi.Input;
@@ -49,16 +59,6 @@ import org.apache.tez.engine.newapi.Processor;
 import org.apache.tez.engine.newapi.TezInputContext;
 import org.apache.tez.engine.newapi.TezOutputContext;
 import org.apache.tez.engine.newapi.TezProcessorContext;
-import org.apache.tez.engine.newapi.impl.EventMetaData;
-import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.newapi.impl.InputSpec;
-import org.apache.tez.engine.newapi.impl.OutputSpec;
-import org.apache.tez.engine.newapi.impl.TaskSpec;
-import org.apache.tez.engine.newapi.impl.TezEvent;
-import org.apache.tez.engine.newapi.impl.TezInputContextImpl;
-import org.apache.tez.engine.newapi.impl.TezOutputContextImpl;
-import org.apache.tez.engine.newapi.impl.TezProcessorContextImpl;
-import org.apache.tez.engine.newapi.impl.TezUmbilical;
 import org.apache.tez.engine.shuffle.common.ShuffleUtils;
 
 import com.google.common.base.Preconditions;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
index ee6cde8..22cbc7c 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
@@ -25,9 +25,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.impl.TaskSpec;
-import org.apache.tez.engine.newapi.impl.TezEvent;
-import org.apache.tez.engine.newapi.impl.TezUmbilical;
+import org.apache.tez.engine.api.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TezEvent;
+import org.apache.tez.engine.api.impl.TezUmbilical;
 
 public abstract class RuntimeTask {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
deleted file mode 100644
index c673d16..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/runtime/RuntimeUtils.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.runtime;
-
-import java.lang.reflect.Constructor;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.OutputSpec;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.api.Processor;
-import org.apache.tez.engine.task.RuntimeTask;
-
-public class RuntimeUtils {
-
-  private static final Log LOG = LogFactory.getLog(RuntimeUtils.class);
-
-  private static final Class<?>[] CONTEXT_ARRAY =
-      new Class[] { TezEngineTaskContext.class };
-  private static final Class<?>[] CONTEXT_INT_ARRAY =
-      new Class[] { TezEngineTaskContext.class, Integer.TYPE };
-  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
-    new ConcurrentHashMap<Class<?>, Constructor<?>>();
-
-  @SuppressWarnings("unchecked")
-  public static <T> T getNewInstance(Class<T> theClass,
-      TezEngineTaskContext context) {
-    T result;
-    try {
-      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
-      if (meth == null) {
-        meth = theClass.getDeclaredConstructor(CONTEXT_ARRAY);
-        meth.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(theClass, meth);
-      }
-      result = meth.newInstance(context);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    return result;
-  }
-  
-  @SuppressWarnings("unchecked")
-  public static <T> T getNewInputInstance(Class<T> theClass,
-      TezEngineTaskContext context, int index) {
-    T result;
-    try {
-      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
-      if (meth == null) {
-        meth = theClass.getDeclaredConstructor(CONTEXT_INT_ARRAY);
-        meth.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(theClass, meth);
-      }
-      result = meth.newInstance(context, index);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    return result;
-  }
-
-  public static RuntimeTask createRuntimeTask(
-      TezEngineTaskContext taskContext) {
-    LOG.info("Creating a runtime task from TaskContext"
-        + ", Processor: " + taskContext.getProcessorName()
-        + ", InputCount=" + taskContext.getInputSpecList().size()
-        + ", OutputCount=" + taskContext.getOutputSpecList().size());
-
-    RuntimeTask t = null;
-    try {
-      Class<?> processorClazz =
-          Class.forName(taskContext.getProcessorName());
-
-      Processor processor = (Processor) getNewInstance(
-          processorClazz, taskContext);
-
-      Input[] inputs;
-      Output[] outputs;
-      if (taskContext.getInputSpecList().isEmpty()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Initializing task with 0 inputs");
-        }
-        inputs = new Input[0];
-      } else {
-        int iSpecCount = taskContext.getInputSpecList().size();
-        inputs = new Input[iSpecCount];
-        for (int i = 0; i < iSpecCount; ++i) {
-          InputSpec inSpec = taskContext.getInputSpecList().get(i);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Using Input"
-                + ", index=" + i
-                + ", inputClass=" + inSpec.getInputClassName());
-          }
-          Class<?> inputClazz = Class.forName(inSpec.getInputClassName());
-          Input input = (Input) getNewInputInstance(inputClazz, taskContext, i);
-          inputs[i] = input;
-        }
-      }
-      if (taskContext.getOutputSpecList().isEmpty()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Initializing task with 0 outputs");
-        }
-        outputs = new Output[0];
-      } else {
-        int oSpecCount = taskContext.getOutputSpecList().size();
-        outputs = new Output[oSpecCount];
-        for (int i = 0; i < oSpecCount; ++i) {
-          OutputSpec outSpec = taskContext.getOutputSpecList().get(i);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Using Output"
-                + ", index=" + i
-                + ", output=" + outSpec.getOutputClassName());
-          }
-          Class<?> outputClazz = Class.forName(outSpec.getOutputClassName());
-          Output output = (Output) getNewInstance(outputClazz, taskContext);
-          outputs[i] = output;
-        }
-      }
-      t = createRuntime(taskContext, processor, inputs, outputs);
-    } catch (ClassNotFoundException e) {
-      throw new RuntimeException("Unable to initialize RuntimeTask, context="
-          + taskContext, e);
-    }
-    return t;
-  }
-
-  private static RuntimeTask createRuntime(TezEngineTaskContext taskContext,
-      Processor processor, Input[] inputs, Output[] outputs) {
-    try {
-      // TODO Change this to use getNewInstance
-      Class<?> runtimeClazz = Class.forName(taskContext.getRuntimeName());
-      Constructor<?> ctor = runtimeClazz.getConstructor(
-          TezEngineTaskContext.class, Processor.class, Input[].class,
-          Output[].class);
-      ctor.setAccessible(true);
-      return (RuntimeTask) ctor.newInstance(taskContext, processor, inputs, outputs);
-    } catch (ClassNotFoundException e) {
-      throw new RuntimeException("Unable to load runtimeClass: "
-          + taskContext.getRuntimeName(), e);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java
index e9bfe36..531e460 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/DiskFetchedInput.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
 
 import com.google.common.base.Preconditions;
 


Mime
View raw message