tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject svn commit: r1469642 [35/36] - in /incubator/tez/branches/TEZ-1: ./ example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/ example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/ tez-common/src/main/ tez-common/src/main/java/ t...
Date Thu, 18 Apr 2013 23:54:28 GMT
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+import org.apache.tez.engine.records.TezVertexID;
+import org.apache.tez.mapreduce.hadoop.MRTaskType;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TezTestUtils {
+
+  public static TezTaskAttemptID getMockTaskAttemptId(
+      int jobId, int taskId, int taskAttemptId, MRTaskType type) {
+    TezTaskAttemptID taskAttemptID = mock(TezTaskAttemptID.class);
+    TezTaskID taskID = getMockTaskId(jobId, taskId, type);
+    when(taskAttemptID.getTaskID()).thenReturn(taskID);
+    when(taskAttemptID.getId()).thenReturn(taskAttemptId);
+    when(taskAttemptID.toString()).thenReturn(
+        "attempt_tez_" + Integer.toString(jobId) + "_" + 
+        ((type == MRTaskType.MAP) ? "m" : "r") + "_" + 
+        Integer.toString(taskId) + "_" + Integer.toString(taskAttemptId)
+        );
+    return taskAttemptID;
+  }
+  
+  public static TezTaskID getMockTaskId(int jobId, int taskId, MRTaskType type) {
+    TezVertexID vertexID = getMockVertexId(jobId, type);
+    TezTaskID taskID = mock(TezTaskID.class);
+    when(taskID.getVertexID()).thenReturn(vertexID);
+    when(taskID.getId()).thenReturn(taskId);
+    return taskID;
+  }
+  
+  public static TezDAGID getMockJobId(int jobId) {
+    TezDAGID jobID = mock(TezDAGID.class);
+    ApplicationId appId = Records.newRecord(ApplicationId.class);
+    appId.setClusterTimestamp(0L);
+    appId.setId(jobId);
+    when(jobID.getId()).thenReturn(jobId);
+    when(jobID.getApplicationId()).thenReturn(appId);
+    return jobID;
+  }
+  
+  public static TezVertexID getMockVertexId(int jobId, MRTaskType type) {
+    TezVertexID vertexID = mock(TezVertexID.class);
+    when(vertexID.getDAGId()).thenReturn(getMockJobId(jobId));
+    when(vertexID.getId()).thenReturn(type == MRTaskType.MAP ? 0 : 1);
+    return vertexID;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,117 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.processor;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.tez.common.TezEngineTask;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.mapreduce.TezTestUtils;
+import org.apache.tez.mapreduce.hadoop.MRTaskType;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.task.InitialTaskWithLocalSort;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class MapUtils {
+
+  private static final Log LOG = LogFactory.getLog(MapUtils.class);
+  
+  private static InputSplit 
+  createInputSplit(FileSystem fs, Path workDir, JobConf job, Path file) 
+      throws IOException {
+    FileInputFormat.setInputPaths(job, workDir);
+  
+    
+    // create a file with length entries
+    SequenceFile.Writer writer = 
+        SequenceFile.createWriter(fs, job, file, 
+            LongWritable.class, Text.class);
+    try {
+      Random r = new Random(System.currentTimeMillis());
+      LongWritable key = new LongWritable();
+      Text value = new Text();
+      for (int i = 10; i > 0; i--) {
+        key.set(r.nextInt(1000));
+        value.set(Integer.toString(i));
+        writer.append(key, value);
+        LOG.info("<k, v> : <" + key.get() + ", " + value + ">");
+      }
+    } finally {
+      writer.close();
+    }
+    
+    SequenceFileInputFormat<LongWritable, Text> format = 
+        new SequenceFileInputFormat<LongWritable, Text>();
+    InputSplit[] splits = format.getSplits(job, 1);
+    System.err.println("#split = " + splits.length + " ; " +
+        "#locs = " + splits[0].getLocations().length + "; " +
+        "loc = " + splits[0].getLocations()[0] + "; " + 
+        "off = " + splits[0].getLength() + "; " +
+        "file = " + ((FileSplit)splits[0]).getPath());
+    return splits[0];
+  }
+
+  public static Task runMapProcessor(FileSystem fs, Path workDir,
+      JobConf jobConf,
+      int mapId, Path mapInput, AbstractModule taskModule,
+      TezTaskUmbilicalProtocol umbilical) 
+      throws Exception {
+    jobConf.setInputFormat(SequenceFileInputFormat.class);
+    InputSplit split = createInputSplit(fs, workDir, jobConf, mapInput);
+    TezEngineTask taskContext = 
+        new TezEngineTask(
+        TezTestUtils.getMockTaskAttemptId(0, mapId, 0, MRTaskType.MAP), "tez",
+        "tez", "TODO_vertexName", InitialTaskWithLocalSort.class.getName(),
+        null, null);
+  
+    Injector injector = Guice.createInjector(taskModule);
+    TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+    Task t = factory.createTask(taskContext);
+    t.initialize(jobConf, umbilical);
+    SimpleInput real = ((SimpleInput)t.getInput());
+    SimpleInput in = spy(real);
+    doReturn(split).when(in).getOldSplitDetails(any(TaskSplitIndex.class));
+    t.getProcessor().process(in, t.getOutput());
+    return t;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.processor.map;
+
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.common.sort.impl.IFile;
+import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
+import org.apache.tez.engine.lib.output.InMemorySortedOutput;
+import org.apache.tez.mapreduce.TestUmbilicalProtocol;
+import org.apache.tez.mapreduce.processor.MapUtils;
+import org.apache.tez.mapreduce.task.InitialTaskWithInMemSort;
+import org.apache.tez.mapreduce.task.InitialTaskWithLocalSort;
+import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.TruncatedChannelBuffer;
+import org.jboss.netty.handler.stream.ChunkedStream;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestMapProcessor {
+  
+  private static final Log LOG = LogFactory.getLog(TestMapProcessor.class);
+  
+  JobConf job;
+  
+  private static JobConf defaultConf = new JobConf();
+  private static FileSystem localFs = null; 
+  static {
+    try {
+      defaultConf.set("fs.defaultFS", "file:///");
+      localFs = FileSystem.getLocal(defaultConf);
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+  private static Path workDir =
+    new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+             "TestMapProcessor").makeQualified(localFs);
+
+  TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
+  
+  @Before
+  public void setUp() {
+    job = new JobConf(defaultConf);
+    job.set(TezJobConfig.LOCAL_DIR, workDir.toString());
+    job.setClass(
+        Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
+        TezLocalTaskOutputFiles.class, 
+        TezTaskOutput.class);
+    job.setNumReduceTasks(1);
+    mapOutputs.setConf(job);
+  }
+  
+  @Test
+  public void testMapProcessor() throws Exception {
+    localFs.delete(workDir, true);
+    MapUtils.runMapProcessor(
+        localFs, workDir, job, 0, new Path(workDir, "map0"), 
+        new InitialTaskWithLocalSort(), new TestUmbilicalProtocol()).close();
+
+    Path mapOutputFile = mapOutputs.getInputFile(0);
+    LOG.info("mapOutputFile = " + mapOutputFile);
+    IFile.Reader reader =
+        new IFile.Reader(job, localFs, mapOutputFile, null, null);
+    LongWritable key = new LongWritable();
+    Text value = new Text();
+    DataInputBuffer keyBuf = new DataInputBuffer();
+    DataInputBuffer valueBuf = new DataInputBuffer();
+    long prev = Long.MIN_VALUE;
+    while (reader.nextRawKey(keyBuf)) {
+      reader.nextRawValue(valueBuf);
+      key.readFields(keyBuf);
+      value.readFields(valueBuf);
+      if (prev != Long.MIN_VALUE) {
+        assert(prev <= key.get());
+        prev = key.get();
+      }
+      LOG.info("key = " + key.get() + "; value = " + value);
+    }
+    reader.close();
+  }
+
+  @Test
+  public void testMapProcessorWithInMemSort() throws Exception {
+    final int partitions = 2;
+    job.setNumReduceTasks(partitions);
+    job.setInt(TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE, partitions); 
+
+    localFs.delete(workDir, true);
+    Task t =
+        MapUtils.runMapProcessor(
+            localFs, workDir, job, 0, new Path(workDir, "map0"), 
+        new InitialTaskWithInMemSort(), new TestUmbilicalProtocol(true));
+    InMemorySortedOutput output = (InMemorySortedOutput)t.getOutput();
+    
+    verifyInMemSortedStream(output, 0, 4096);
+    int i = 0;
+    for (i = 2; i < 256; i <<= 1) {
+      verifyInMemSortedStream(output, 0, i);
+    }
+    verifyInMemSortedStream(output, 1, 4096);
+    for (i = 2; i < 256; i <<= 1) {
+      verifyInMemSortedStream(output, 1, i);
+    }
+
+    t.close();
+  }
+  
+  private void verifyInMemSortedStream(
+      InMemorySortedOutput output, int partition, int chunkSize) 
+          throws Exception {
+    ChunkedStream cs = 
+        new ChunkedStream(
+            output.getSorter().getSortedStream(partition), chunkSize);
+    int actualBytes = 0;
+    ChannelBuffer b = null;
+    while ((b = (ChannelBuffer)cs.nextChunk()) != null) {
+      LOG.info("b = " + b);
+      actualBytes += 
+          (b instanceof TruncatedChannelBuffer) ? 
+              ((TruncatedChannelBuffer)b).capacity() :
+              ((BigEndianHeapChannelBuffer)b).readableBytes();
+    }
+    
+    LOG.info("verifyInMemSortedStream" +
+    		" partition=" + partition + 
+    		" chunkSize=" + chunkSize +
+        " expected=" + 
+    		output.getSorter().getShuffleHeader(partition).getCompressedLength() + 
+        " actual=" + actualBytes);
+    Assert.assertEquals(
+        output.getSorter().getShuffleHeader(partition).getCompressedLength(), 
+        actualBytes);
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.processor.reduce;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.InputSpec;
+import org.apache.tez.common.TezEngineTask;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.mapreduce.TestUmbilicalProtocol;
+import org.apache.tez.mapreduce.TezTestUtils;
+import org.apache.tez.mapreduce.hadoop.MRTaskType;
+import org.apache.tez.mapreduce.processor.MapUtils;
+import org.apache.tez.mapreduce.task.InitialTaskWithLocalSort;
+import org.apache.tez.mapreduce.task.LocalFinalTask;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class TestReduceProcessor {
+  
+  private static final Log LOG = LogFactory.getLog(TestReduceProcessor.class);
+  
+  JobConf job;
+  
+  private static JobConf defaultConf = new JobConf();
+  private static FileSystem localFs = null; 
+  static {
+    try {
+      defaultConf.set("fs.defaultFS", "file:///");
+      localFs = FileSystem.getLocal(defaultConf);
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+  private static Path workDir =
+    new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+             "TestReduceProcessor").makeQualified(localFs);
+
+  @Before
+  public void setUp() {
+    job = new JobConf(defaultConf);
+    job.set(TezJobConfig.LOCAL_DIR, workDir.toString());
+    job.setClass(
+        Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
+        TezLocalTaskOutputFiles.class, 
+        TezTaskOutput.class);
+    job.setNumReduceTasks(1);
+  }
+  
+  @Test
+  public void testReduceProcessor() throws Exception {
+    localFs.delete(workDir, true);
+
+    // Run a map
+    MapUtils.runMapProcessor(
+        localFs, workDir, job, 0, new Path(workDir, "map0"), 
+        new InitialTaskWithLocalSort(), new TestUmbilicalProtocol()
+        );
+
+    LOG.info("Starting reduce...");
+    FileOutputFormat.setOutputPath(job, new Path(workDir, "output"));
+    
+    // Now run a reduce
+    TezEngineTask taskContext = new TezEngineTask(
+        TezTestUtils.getMockTaskAttemptId(0, 0, 0, MRTaskType.REDUCE), "tez",
+        "tez", "TODO_vertexName", LocalFinalTask.class.getName(),
+        Collections.singletonList(new InputSpec("TODO_srcVertexName", 1)), null);
+    job.set(JobContext.TASK_ATTEMPT_ID, taskContext.getTaskAttemptId().toString());
+    Injector injector = Guice.createInjector(new LocalFinalTask());
+    TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+    Task t = factory.createTask(taskContext);
+    t.initialize(job, new TestUmbilicalProtocol());
+    t.run();
+    t.close();
+
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/resources/log4j.properties?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/resources/log4j.properties (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/resources/log4j.properties Thu Apr 18 23:54:18 2013
@@ -0,0 +1,19 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshhold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/resources/log4j.properties
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-yarn-client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/pom.xml?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/pom.xml (added)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/pom.xml Thu Apr 18 23:54:18 2013
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez</artifactId>
+    <version>0.2.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>tez-yarn-client</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-mapreduce</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

Propchange: incubator/tez/branches/TEZ-1/tez-yarn-client/pom.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java (added)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+
+public class ClientCache {
+
+  private final Configuration conf;
+  private final ResourceMgrDelegate rm;
+
+  private static final Log LOG = LogFactory.getLog(ClientCache.class);
+
+  private Map<JobID, ClientServiceDelegate> cache = 
+      new HashMap<JobID, ClientServiceDelegate>();
+
+  private MRClientProtocol hsProxy;
+
+  public ClientCache(Configuration conf, ResourceMgrDelegate rm) {
+    this.conf = conf;
+    this.rm = rm;
+  }
+
+  //TODO: evict from the cache on some threshold
+  public synchronized ClientServiceDelegate getClient(JobID jobId) {
+    if (hsProxy == null) {
+      try {
+        hsProxy = instantiateHistoryProxy();
+      } catch (IOException e) {
+        LOG.warn("Could not connect to History server.", e);
+        throw new YarnException("Could not connect to History server.", e);
+      }
+    }
+    ClientServiceDelegate client = cache.get(jobId);
+    if (client == null) {
+      client = new ClientServiceDelegate(conf, rm, jobId, hsProxy);
+      cache.put(jobId, client);
+    }
+    return client;
+  }
+
+  protected synchronized MRClientProtocol getInitializedHSProxy()
+      throws IOException {
+    if (this.hsProxy == null) {
+      hsProxy = instantiateHistoryProxy();
+    }
+    return this.hsProxy;
+  }
+  
+  protected MRClientProtocol instantiateHistoryProxy()
+      throws IOException {
+    final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
+    if (StringUtils.isEmpty(serviceAddr)) {
+      return null;
+    }
+    LOG.debug("Connecting to HistoryServer at: " + serviceAddr);
+    final YarnRPC rpc = YarnRPC.create(conf);
+    LOG.debug("Connected to HistoryServer at: " + serviceAddr);
+    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+    return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
+      @Override
+      public MRClientProtocol run() {
+        return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
+            NetUtils.createSocketAddr(serviceAddr), conf);
+      }
+    });
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java (added)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,473 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.*;
+import org.apache.hadoop.mapreduce.v2.api.records.*;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ClientToken;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ProtoUtils;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+
+public class ClientServiceDelegate {
+  private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
+  private static final String UNAVAILABLE = "N/A";
+
+  // Caches for per-user NotRunningJobs
+  private HashMap<JobState, HashMap<String, NotRunningJob>> notRunningJobs;
+
+  private final Configuration conf;
+  private final JobID jobId;
+  private final ApplicationId appId;
+  private final ResourceMgrDelegate rm;
+  private final MRClientProtocol historyServerProxy;
+  private MRClientProtocol realProxy = null;
+  private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+  private static String UNKNOWN_USER = "Unknown User";
+  private String trackingUrl;
+
+  private boolean amAclDisabledStatusLogged = false;
+
+  public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
+      JobID jobId, MRClientProtocol historyServerProxy) {
+    this.conf = new Configuration(conf); // Cloning for modifying.
+    // For faster redirects from AM to HS.
+    this.conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+        this.conf.getInt(MRJobConfig.MR_CLIENT_TO_AM_IPC_MAX_RETRIES,
+            MRJobConfig.DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES));
+    this.rm = rm;
+    this.jobId = jobId;
+    this.historyServerProxy = historyServerProxy;
+    this.appId = TypeConverter.toYarn(jobId).getAppId();
+    notRunningJobs = new HashMap<JobState, HashMap<String, NotRunningJob>>();
+  }
+
+  // Get the instance of the NotRunningJob corresponding to the specified
+  // user and state
+  private NotRunningJob getNotRunningJob(ApplicationReport applicationReport,
+      JobState state) {
+    synchronized (notRunningJobs) {
+      HashMap<String, NotRunningJob> map = notRunningJobs.get(state);
+      if (map == null) {
+        map = new HashMap<String, NotRunningJob>();
+        notRunningJobs.put(state, map);
+      }
+      String user =
+          (applicationReport == null) ?
+              UNKNOWN_USER : applicationReport.getUser();
+      NotRunningJob notRunningJob = map.get(user);
+      if (notRunningJob == null) {
+        notRunningJob = new NotRunningJob(applicationReport, state);
+        map.put(user, notRunningJob);
+      }
+      return notRunningJob;
+    }
+  }
+
+  private MRClientProtocol getProxy() throws YarnRemoteException {
+    if (realProxy != null) {
+      return realProxy;
+    }
+    
+    // Possibly allow nulls through the PB tunnel, otherwise deal with an exception
+    // and redirect to the history server.
+    ApplicationReport application = rm.getApplicationReport(appId);
+    if (application != null) {
+      trackingUrl = application.getTrackingUrl();
+    }
+    InetSocketAddress serviceAddr = null;
+    while (application == null
+        || YarnApplicationState.RUNNING == application
+            .getYarnApplicationState()) {
+      if (application == null) {
+        LOG.info("Could not get Job info from RM for job " + jobId
+            + ". Redirecting to job history server.");
+        return checkAndGetHSProxy(null, JobState.NEW);
+      }
+      try {
+        if (application.getHost() == null || "".equals(application.getHost())) {
+          LOG.debug("AM not assigned to Job. Waiting to get the AM ...");
+          Thread.sleep(2000);
+
+          LOG.debug("Application state is " + application.getYarnApplicationState());
+          application = rm.getApplicationReport(appId);
+          continue;
+        } else if (UNAVAILABLE.equals(application.getHost())) {
+          if (!amAclDisabledStatusLogged) {
+            LOG.info("Job " + jobId + " is running, but the host is unknown."
+                + " Verify user has VIEW_JOB access.");
+            amAclDisabledStatusLogged = true;
+          }
+          return getNotRunningJob(application, JobState.RUNNING);
+        }
+        if(!conf.getBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED, false)) {
+          UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
+              UserGroupInformation.getCurrentUser().getUserName());
+          serviceAddr = NetUtils.createSocketAddrForHost(
+              application.getHost(), application.getRpcPort());
+          if (UserGroupInformation.isSecurityEnabled()) {
+            ClientToken clientToken = application.getClientToken();
+            Token<ClientTokenIdentifier> token =
+                ProtoUtils.convertFromProtoFormat(clientToken, serviceAddr);
+            newUgi.addToken(token);
+          }
+          LOG.debug("Connecting to " + serviceAddr);
+          final InetSocketAddress finalServiceAddr = serviceAddr;
+          realProxy = newUgi.doAs(new PrivilegedExceptionAction<MRClientProtocol>() {
+            @Override
+            public MRClientProtocol run() throws IOException {
+              return instantiateAMProxy(finalServiceAddr);
+            }
+          });
+        } else {
+          if (!amAclDisabledStatusLogged) {
+            LOG.info("Network ACL closed to AM for job " + jobId
+                + ". Not going to try to reach the AM.");
+            amAclDisabledStatusLogged = true;
+          }
+          return getNotRunningJob(null, JobState.RUNNING);
+        }
+        return realProxy;
+      } catch (IOException e) {
+        //possibly the AM has crashed
+        //there may be some time before AM is restarted
+        //keep retrying by getting the address from RM
+        LOG.info("Could not connect to " + serviceAddr +
+        ". Waiting for getting the latest AM address...");
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException e1) {
+          LOG.warn("getProxy() call interruped", e1);
+          throw new YarnException(e1);
+        }
+        application = rm.getApplicationReport(appId);
+        if (application == null) {
+          LOG.info("Could not get Job info from RM for job " + jobId
+              + ". Redirecting to job history server.");
+          return checkAndGetHSProxy(null, JobState.RUNNING);
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("getProxy() call interruped", e);
+        throw new YarnException(e);
+      }
+    }
+
+    /** we just want to return if its allocating, so that we don't
+     * block on it. This is to be able to return job status
+     * on an allocating Application.
+     */
+    String user = application.getUser();
+    if (user == null) {
+      throw RPCUtil.getRemoteException("User is not set in the application report");
+    }
+    if (application.getYarnApplicationState() == YarnApplicationState.NEW
+        || application.getYarnApplicationState() == YarnApplicationState.SUBMITTED
+        || application.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
+      realProxy = null;
+      return getNotRunningJob(application, JobState.NEW);
+    }
+
+    if (application.getYarnApplicationState() == YarnApplicationState.FAILED) {
+      realProxy = null;
+      return getNotRunningJob(application, JobState.FAILED);
+    }
+
+    if (application.getYarnApplicationState() == YarnApplicationState.KILLED) {
+      realProxy = null;
+      return getNotRunningJob(application, JobState.KILLED);
+    }
+
+    //History server can serve a job only if application
+    //succeeded.
+    if (application.getYarnApplicationState() == YarnApplicationState.FINISHED) {
+      LOG.info("Application state is completed. FinalApplicationStatus="
+          + application.getFinalApplicationStatus().toString()
+          + ". Redirecting to job history server");
+      realProxy = checkAndGetHSProxy(application, JobState.SUCCEEDED);
+    }
+    return realProxy;
+  }
+
+  private MRClientProtocol checkAndGetHSProxy(
+      ApplicationReport applicationReport, JobState state) {
+    if (null == historyServerProxy) {
+      LOG.warn("Job History Server is not configured.");
+      return getNotRunningJob(applicationReport, state);
+    }
+    return historyServerProxy;
+  }
+
+  MRClientProtocol instantiateAMProxy(final InetSocketAddress serviceAddr)
+      throws IOException {
+    LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr);
+    YarnRPC rpc = YarnRPC.create(conf);
+    MRClientProtocol proxy = 
+         (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
+            serviceAddr, conf);
+    LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
+    return proxy;
+  }
+
+  private synchronized Object invoke(String method, Class argClass,
+      Object args) throws IOException {
+    Method methodOb = null;
+    try {
+      methodOb = MRClientProtocol.class.getMethod(method, argClass);
+    } catch (SecurityException e) {
+      throw new YarnException(e);
+    } catch (NoSuchMethodException e) {
+      throw new YarnException("Method name mismatch", e);
+    }
+    int maxRetries = this.conf.getInt(
+        MRJobConfig.MR_CLIENT_MAX_RETRIES,
+        MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
+    IOException lastException = null;
+    while (maxRetries > 0) {
+      try {
+        return methodOb.invoke(getProxy(), args);
+      } catch (YarnRemoteException yre) {
+        LOG.warn("Exception thrown by remote end.", yre);
+        throw yre;
+      } catch (InvocationTargetException e) {
+        if (e.getTargetException() instanceof YarnRemoteException) {
+          LOG.warn("Error from remote end: " + e
+              .getTargetException().getLocalizedMessage());
+          LOG.debug("Tracing remote error ", e.getTargetException());
+          throw (YarnRemoteException) e.getTargetException();
+        }
+        LOG.debug("Failed to contact AM/History for job " + jobId + 
+            " retrying..", e.getTargetException());
+        // Force reconnection by setting the proxy to null.
+        realProxy = null;
+        // HS/AMS shut down
+        maxRetries--;
+        lastException = new IOException(e.getMessage());
+        
+      } catch (Exception e) {
+        LOG.debug("Failed to contact AM/History for job " + jobId
+            + "  Will retry..", e);
+        // Force reconnection by setting the proxy to null.
+        realProxy = null;
+        // RM shutdown
+        maxRetries--;
+        lastException = new IOException(e.getMessage());     
+      }
+    }
+    throw lastException;
+  }
+
+  public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
+  InterruptedException {
+    org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);
+      GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
+      request.setJobId(jobID);
+      Counters cnt = ((GetCountersResponse)
+          invoke("getCounters", GetCountersRequest.class, request)).getCounters();
+      return TypeConverter.fromYarn(cnt);
+
+  }
+
+  public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2)
+      throws IOException, InterruptedException {
+    org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter
+        .toYarn(arg0);
+    GetTaskAttemptCompletionEventsRequest request = recordFactory
+        .newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
+    request.setJobId(jobID);
+    request.setFromEventId(arg1);
+    request.setMaxEvents(arg2);
+    List<org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent> list =
+      ((GetTaskAttemptCompletionEventsResponse) invoke(
+        "getTaskAttemptCompletionEvents", GetTaskAttemptCompletionEventsRequest.class, request)).
+        getCompletionEventList();
+    return TypeConverter
+        .fromYarn(list
+            .toArray(new org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[0]));
+  }
+
+  public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0)
+      throws IOException, InterruptedException {
+
+    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter
+        .toYarn(arg0);
+    GetDiagnosticsRequest request = recordFactory
+        .newRecordInstance(GetDiagnosticsRequest.class);
+    request.setTaskAttemptId(attemptID);
+    List<String> list = ((GetDiagnosticsResponse) invoke("getDiagnostics",
+        GetDiagnosticsRequest.class, request)).getDiagnosticsList();
+    String[] result = new String[list.size()];
+    int i = 0;
+    for (String c : list) {
+      result[i++] = c.toString();
+    }
+    return result;
+  }
+  
+  public JobStatus getJobStatus(JobID oldJobID) throws IOException {
+    org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
+      TypeConverter.toYarn(oldJobID);
+    GetJobReportRequest request =
+        recordFactory.newRecordInstance(GetJobReportRequest.class);
+    request.setJobId(jobId);
+    JobReport report = ((GetJobReportResponse) invoke("getJobReport",
+        GetJobReportRequest.class, request)).getJobReport();
+    JobStatus jobStatus = null;
+    if (report != null) {
+      if (StringUtils.isEmpty(report.getJobFile())) {
+        String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID);
+        report.setJobFile(jobFile);
+      }
+      String historyTrackingUrl = report.getTrackingUrl();
+      String url = StringUtils.isNotEmpty(historyTrackingUrl)
+          ? historyTrackingUrl : trackingUrl;
+      if (!UNAVAILABLE.equals(url)) {
+        url = HttpConfig.getSchemePrefix() + url;
+      }
+      jobStatus = TypeConverter.fromYarn(report, url);
+    }
+    return jobStatus;
+  }
+
+  public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
+       throws IOException{
+    org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
+      TypeConverter.toYarn(oldJobID);
+    GetTaskReportsRequest request =
+        recordFactory.newRecordInstance(GetTaskReportsRequest.class);
+    request.setJobId(jobId);
+    request.setTaskType(TypeConverter.toYarn(taskType));
+
+    List<org.apache.hadoop.mapreduce.v2.api.records.TaskReport> taskReports =
+      ((GetTaskReportsResponse) invoke("getTaskReports", GetTaskReportsRequest.class,
+          request)).getTaskReportList();
+
+    return TypeConverter.fromYarn
+    (taskReports).toArray(new org.apache.hadoop.mapreduce.TaskReport[0]);
+  }
+
+  public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
+       throws IOException {
+    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
+      = TypeConverter.toYarn(taskAttemptID);
+    if (fail) {
+      FailTaskAttemptRequest failRequest = recordFactory.newRecordInstance(FailTaskAttemptRequest.class);
+      failRequest.setTaskAttemptId(attemptID);
+      invoke("failTaskAttempt", FailTaskAttemptRequest.class, failRequest);
+    } else {
+      KillTaskAttemptRequest killRequest = recordFactory.newRecordInstance(KillTaskAttemptRequest.class);
+      killRequest.setTaskAttemptId(attemptID);
+      invoke("killTaskAttempt", KillTaskAttemptRequest.class, killRequest);
+    }
+    return true;
+  }
+
+  public boolean killJob(JobID oldJobID)
+       throws IOException {
+    org.apache.hadoop.mapreduce.v2.api.records.JobId jobId
+    = TypeConverter.toYarn(oldJobID);
+    KillJobRequest killRequest = recordFactory.newRecordInstance(KillJobRequest.class);
+    killRequest.setJobId(jobId);
+    invoke("killJob", KillJobRequest.class, killRequest);
+    return true;
+  }
+
+  public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID)
+      throws YarnRemoteException, IOException {
+    org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
+        TypeConverter.toYarn(oldJobID);
+    GetJobReportRequest request =
+        recordFactory.newRecordInstance(GetJobReportRequest.class);
+    request.setJobId(jobId);
+
+    JobReport report =
+        ((GetJobReportResponse) invoke("getJobReport",
+            GetJobReportRequest.class, request)).getJobReport();
+    if (EnumSet.of(JobState.SUCCEEDED, JobState.FAILED, JobState.KILLED,
+        JobState.ERROR).contains(report.getJobState())) {
+      if (oldTaskAttemptID != null) {
+        GetTaskAttemptReportRequest taRequest =
+            recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class);
+        taRequest.setTaskAttemptId(TypeConverter.toYarn(oldTaskAttemptID));
+        TaskAttemptReport taReport =
+            ((GetTaskAttemptReportResponse) invoke("getTaskAttemptReport",
+                GetTaskAttemptReportRequest.class, taRequest))
+                .getTaskAttemptReport();
+        if (taReport.getContainerId() == null
+            || taReport.getNodeManagerHost() == null) {
+          throw new IOException("Unable to get log information for task: "
+              + oldTaskAttemptID);
+        }
+        return new LogParams(
+            taReport.getContainerId().toString(),
+            taReport.getContainerId().getApplicationAttemptId()
+                .getApplicationId().toString(),
+            BuilderUtils.newNodeId(taReport.getNodeManagerHost(),
+                taReport.getNodeManagerPort()).toString(), report.getUser());
+      } else {
+        if (report.getAMInfos() == null || report.getAMInfos().size() == 0) {
+          throw new IOException("Unable to get log information for job: "
+              + oldJobID);
+        }
+        AMInfo amInfo = report.getAMInfos().get(report.getAMInfos().size() - 1);
+        return new LogParams(
+            amInfo.getContainerId().toString(),
+            amInfo.getAppAttemptId().getApplicationId().toString(),
+            BuilderUtils.newNodeId(amInfo.getNodeManagerHost(),
+                amInfo.getNodeManagerPort()).toString(), report.getUser());
+      }
+    } else {
+      throw new IOException("Cannot get log path for a in-progress job");
+    }
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java (added)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,241 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+
+public class NotRunningJob implements MRClientProtocol {
+
+  private RecordFactory recordFactory =
+    RecordFactoryProvider.getRecordFactory(null);
+
+  private final JobState jobState;
+  private final ApplicationReport applicationReport;
+
+
+  private ApplicationReport getUnknownApplicationReport() {
+    ApplicationId unknownAppId = recordFactory
+        .newRecordInstance(ApplicationId.class);
+    ApplicationAttemptId unknownAttemptId = recordFactory
+        .newRecordInstance(ApplicationAttemptId.class);
+
+    // Setting AppState to NEW and finalStatus to UNDEFINED as they are never
+    // used for a non running job
+    return BuilderUtils.newApplicationReport(unknownAppId, unknownAttemptId,
+        "N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A",
+        "N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A");
+  }
+
+  NotRunningJob(ApplicationReport applicationReport, JobState jobState) {
+    this.applicationReport =
+        (applicationReport ==  null) ?
+            getUnknownApplicationReport() : applicationReport;
+    this.jobState = jobState;
+  }
+
+  @Override
+  public FailTaskAttemptResponse failTaskAttempt(
+      FailTaskAttemptRequest request) throws YarnRemoteException {
+    FailTaskAttemptResponse resp =
+      recordFactory.newRecordInstance(FailTaskAttemptResponse.class);
+    return resp;
+  }
+
+  @Override
+  public GetCountersResponse getCounters(GetCountersRequest request)
+      throws YarnRemoteException {
+    GetCountersResponse resp =
+      recordFactory.newRecordInstance(GetCountersResponse.class);
+    Counters counters = recordFactory.newRecordInstance(Counters.class);
+    counters.addAllCounterGroups(new HashMap<String, CounterGroup>());
+    resp.setCounters(counters);
+    return resp;
+  }
+
+  @Override
+  public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request)
+      throws YarnRemoteException {
+    GetDiagnosticsResponse resp =
+      recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
+    resp.addDiagnostics("");
+    return resp;
+  }
+
+  @Override
+  public GetJobReportResponse getJobReport(GetJobReportRequest request)
+      throws YarnRemoteException {
+    JobReport jobReport =
+      recordFactory.newRecordInstance(JobReport.class);
+    jobReport.setJobId(request.getJobId());
+    jobReport.setJobState(jobState);
+    jobReport.setUser(applicationReport.getUser());
+    jobReport.setStartTime(applicationReport.getStartTime());
+    jobReport.setDiagnostics(applicationReport.getDiagnostics());
+    jobReport.setJobName(applicationReport.getName());
+    jobReport.setTrackingUrl(applicationReport.getTrackingUrl());
+    jobReport.setFinishTime(applicationReport.getFinishTime());
+
+    GetJobReportResponse resp =
+        recordFactory.newRecordInstance(GetJobReportResponse.class);
+    resp.setJobReport(jobReport);
+    return resp;
+  }
+
+  @Override
+  public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
+      GetTaskAttemptCompletionEventsRequest request)
+      throws YarnRemoteException {
+    GetTaskAttemptCompletionEventsResponse resp =
+      recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
+    resp.addAllCompletionEvents(new ArrayList<TaskAttemptCompletionEvent>());
+    return resp;
+  }
+
+  @Override
+  public GetTaskAttemptReportResponse getTaskAttemptReport(
+      GetTaskAttemptReportRequest request) throws YarnRemoteException {
+    //not invoked by anybody
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
+      throws YarnRemoteException {
+    GetTaskReportResponse resp =
+      recordFactory.newRecordInstance(GetTaskReportResponse.class);
+    TaskReport report = recordFactory.newRecordInstance(TaskReport.class);
+    report.setTaskId(request.getTaskId());
+    report.setTaskState(TaskState.NEW);
+    Counters counters = recordFactory.newRecordInstance(Counters.class);
+    counters.addAllCounterGroups(new HashMap<String, CounterGroup>());
+    report.setCounters(counters);
+    report.addAllRunningAttempts(new ArrayList<TaskAttemptId>());
+    return resp;
+  }
+
+  @Override
+  public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request)
+      throws YarnRemoteException {
+    GetTaskReportsResponse resp =
+      recordFactory.newRecordInstance(GetTaskReportsResponse.class);
+    resp.addAllTaskReports(new ArrayList<TaskReport>());
+    return resp;
+  }
+
+  @Override
+  public KillJobResponse killJob(KillJobRequest request)
+      throws YarnRemoteException {
+    KillJobResponse resp =
+      recordFactory.newRecordInstance(KillJobResponse.class);
+    return resp;
+  }
+
+  @Override
+  public KillTaskResponse killTask(KillTaskRequest request)
+      throws YarnRemoteException {
+    KillTaskResponse resp =
+      recordFactory.newRecordInstance(KillTaskResponse.class);
+    return resp;
+  }
+
+  @Override
+  public KillTaskAttemptResponse killTaskAttempt(
+      KillTaskAttemptRequest request) throws YarnRemoteException {
+    KillTaskAttemptResponse resp =
+      recordFactory.newRecordInstance(KillTaskAttemptResponse.class);
+    return resp;
+  }
+
+  @Override
+  public GetDelegationTokenResponse getDelegationToken(
+      GetDelegationTokenRequest request) throws YarnRemoteException {
+    /* Should not be invoked by anyone. */
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public RenewDelegationTokenResponse renewDelegationToken(
+      RenewDelegationTokenRequest request) throws YarnRemoteException {
+    /* Should not be invoked by anyone. */
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public CancelDelegationTokenResponse cancelDelegationToken(
+      CancelDelegationTokenRequest request) throws YarnRemoteException {
+    /* Should not be invoked by anyone. */
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public InetSocketAddress getConnectAddress() {
+    /* Should not be invoked by anyone.  Normally used to set token service */
+    throw new NotImplementedException();
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java (added)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,168 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ProtoUtils;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+public class ResourceMgrDelegate extends YarnClientImpl {
+  private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
+      
+  private YarnConfiguration conf;
+  private GetNewApplicationResponse application;
+  private ApplicationId applicationId;
+
+  /**
+   * Delegate responsible for communicating with the Resource Manager's {@link ClientRMProtocol}.
+   * @param conf the configuration object.
+   */
+  public ResourceMgrDelegate(YarnConfiguration conf) {
+    super();
+    this.conf = conf;
+    init(conf);
+    start();
+  }
+
+  public TaskTrackerInfo[] getActiveTrackers() throws IOException,
+      InterruptedException {
+    return TypeConverter.fromYarnNodes(super.getNodeReports());
+  }
+
+  public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+    return TypeConverter.fromYarnApps(super.getApplicationList(), this.conf);
+  }
+
+  public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
+      InterruptedException {
+    // TODO: Implement getBlacklistedTrackers
+    LOG.warn("getBlacklistedTrackers - Not implemented yet");
+    return new TaskTrackerInfo[0];
+  }
+
+  public ClusterMetrics getClusterMetrics() throws IOException,
+      InterruptedException {
+    YarnClusterMetrics metrics = super.getYarnClusterMetrics();
+    ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1, 
+        metrics.getNumNodeManagers() * 10, metrics.getNumNodeManagers() * 2, 1,
+        metrics.getNumNodeManagers(), 0, 0);
+    return oldMetrics;
+  }
+
+  @SuppressWarnings("rawtypes")
+  public Token getDelegationToken(Text renewer) throws IOException,
+      InterruptedException {
+    return ProtoUtils.convertFromProtoFormat(
+      super.getRMDelegationToken(renewer), rmAddress);
+  }
+
+  public String getFilesystemName() throws IOException, InterruptedException {
+    return FileSystem.get(conf).getUri().toString();
+  }
+
+  public JobID getNewJobID() throws IOException, InterruptedException {
+    this.application = super.getNewApplication();
+    this.applicationId = this.application.getApplicationId();
+    return TypeConverter.fromYarn(applicationId);
+  }
+
+  public QueueInfo getQueue(String queueName) throws IOException,
+  InterruptedException {
+    return TypeConverter.fromYarn(
+        super.getQueueInfo(queueName), this.conf);
+  }
+
+  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
+      InterruptedException {
+    return TypeConverter.fromYarnQueueUserAclsInfo(super
+      .getQueueAclsInfo());
+  }
+
+  public QueueInfo[] getQueues() throws IOException, InterruptedException {
+    return TypeConverter.fromYarnQueueInfo(super.getAllQueues(), this.conf);
+  }
+
+  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+    return TypeConverter.fromYarnQueueInfo(super.getRootQueueInfos(), this.conf);
+  }
+
+  public QueueInfo[] getChildQueues(String parent) throws IOException,
+      InterruptedException {
+    return TypeConverter.fromYarnQueueInfo(super.getChildQueueInfos(parent),
+      this.conf);
+  }
+
+  public String getStagingAreaDir() throws IOException, InterruptedException {
+//    Path path = new Path(MRJobConstants.JOB_SUBMIT_DIR);
+    String user = 
+      UserGroupInformation.getCurrentUser().getShortUserName();
+    Path path = MRApps.getStagingAreaDir(conf, user);
+    LOG.debug("getStagingAreaDir: dir=" + path);
+    return path.toString();
+  }
+
+
+  public String getSystemDir() throws IOException, InterruptedException {
+    Path sysDir = new Path(MRJobConfig.JOB_SUBMIT_DIR);
+    //FileContext.getFileContext(conf).delete(sysDir, true);
+    return sysDir.toString();
+  }
+  
+
+  public long getTaskTrackerExpiryInterval() throws IOException,
+      InterruptedException {
+    return 0;
+  }
+  
+  public void setJobPriority(JobID arg0, String arg1) throws IOException,
+      InterruptedException {
+    return;
+  }
+
+
+  public long getProtocolVersion(String arg0, long arg1) throws IOException {
+    return 0;
+  }
+
+  public ApplicationId getApplicationId() {
+    return applicationId;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message