tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-313. Change TezEntityDescriptor to not use ByteBuffer as its not thread safe (bikas)
Date Mon, 22 Jul 2013 23:47:33 GMT
Updated Branches:
  refs/heads/master c19c62081 -> 92b2e2151


TEZ-313. Change TezEntityDescriptor to not use ByteBuffer as its not thread safe (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/92b2e215
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/92b2e215
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/92b2e215

Branch: refs/heads/master
Commit: 92b2e2151566e026744bc6bf8ef2b423e4a2bbc9
Parents: c19c620
Author: Bikas Saha <bikas@apache.org>
Authored: Mon Jul 22 16:45:48 2013 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Mon Jul 22 16:45:48 2013 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/api/DagTypeConverters.java   | 13 ++++-----
 .../org/apache/tez/dag/api/InputDescriptor.java |  4 +--
 .../apache/tez/dag/api/OutputDescriptor.java    |  4 +--
 .../apache/tez/dag/api/ProcessorDescriptor.java |  4 +--
 .../apache/tez/dag/api/TezEntityDescriptor.java |  8 ++----
 .../org/apache/tez/dag/api/TestDAGPlan.java     | 17 ++++-------
 .../tez/dag/api/committer/VertexContext.java    |  3 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  2 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 13 ++++-----
 .../java/org/apache/tez/engine/api/Task.java    |  3 +-
 .../apache/tez/common/TezEngineTaskContext.java |  3 +-
 .../org/apache/tez/engine/task/RuntimeTask.java |  5 ++--
 .../tez/mapreduce/examples/MRRSleepJob.java     |  6 ++--
 .../org/apache/tez/mapreduce/TestMRRJobs.java   |  6 ++--
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 10 +++----
 .../apache/tez/mapreduce/hadoop/MRHelpers.java  | 13 ++++-----
 .../tez/mapreduce/task/MRRuntimeTask.java       | 30 ++------------------
 .../org/apache/tez/mapreduce/YARNRunner.java    |  2 +-
 18 files changed, 48 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/92b2e215/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 517b61e..c58280d 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -17,7 +17,6 @@
  */
 package org.apache.tez.dag.api;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -237,9 +236,9 @@ public class DagTypeConverters {
   public static InputDescriptor convertInputDescriptorFromDAGPlan(
       TezEntityDescriptorProto proto) {
     String className = proto.getClassName();
-    ByteBuffer bb = null;
+    byte[] bb = null;
     if (proto.hasUserPayload()) {
-      bb = proto.getUserPayload().asReadOnlyByteBuffer();
+      bb = proto.getUserPayload().toByteArray();
     }
     return new InputDescriptor(className, bb);
   }
@@ -247,9 +246,9 @@ public class DagTypeConverters {
   public static OutputDescriptor convertOutputDescriptorFromDAGPlan(
       TezEntityDescriptorProto proto) {
     String className = proto.getClassName();
-    ByteBuffer bb = null;
+    byte[] bb = null;
     if (proto.hasUserPayload()) {
-      bb = proto.getUserPayload().asReadOnlyByteBuffer();
+      bb =  proto.getUserPayload().toByteArray();
     }
     return new OutputDescriptor(className, bb);
   }
@@ -257,9 +256,9 @@ public class DagTypeConverters {
   public static ProcessorDescriptor convertProcessorDescriptorFromDAGPlan(
       TezEntityDescriptorProto proto) {
     String className = proto.getClassName();
-    ByteBuffer bb = null;
+    byte[] bb = null;
     if (proto.hasUserPayload()) {
-      bb = ByteBuffer.wrap(proto.getUserPayload().toByteArray());
+      bb = proto.getUserPayload().toByteArray();
     }
     return new ProcessorDescriptor(className, bb);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/92b2e215/tez-dag-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
index 3b3054a..829ff1e 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
@@ -18,12 +18,10 @@
 
 package org.apache.tez.dag.api;
 
-import java.nio.ByteBuffer;
-
 public class InputDescriptor extends TezEntityDescriptor {
   
   // TODO Fix dependencies so that this can be specified as a class.  
-  public InputDescriptor(String inputClassName, ByteBuffer userPayload) {
+  public InputDescriptor(String inputClassName, byte[] userPayload) {
     super(inputClassName, userPayload);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/92b2e215/tez-dag-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
index e0956d3..afa7586 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
@@ -18,12 +18,10 @@
 
 package org.apache.tez.dag.api;
 
-import java.nio.ByteBuffer;
-
 public class OutputDescriptor extends TezEntityDescriptor {
 
   // TODO Fix dependencies so that this can be specified as a class.
-  public OutputDescriptor(String outputClassName, ByteBuffer userPayload) {
+  public OutputDescriptor(String outputClassName, byte[] userPayload) {
     super(outputClassName, userPayload);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/92b2e215/tez-dag-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
index 9f2a564..0d04214 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
@@ -18,12 +18,10 @@
 
 package org.apache.tez.dag.api;
 
-import java.nio.ByteBuffer;
-
 public class ProcessorDescriptor extends TezEntityDescriptor {
 
   // TODO Fix dependencies so that this can be specified as a class.
-  public ProcessorDescriptor(String processorClassName, ByteBuffer userPayload) {
+  public ProcessorDescriptor(String processorClassName, byte[] userPayload) {
     super(processorClassName, userPayload);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/92b2e215/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
index 7283310..e78a5a1 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
@@ -18,19 +18,17 @@
 
 package org.apache.tez.dag.api;
 
-import java.nio.ByteBuffer;
-
 public abstract class TezEntityDescriptor {
 
-  private ByteBuffer userPayload;
+  private byte[] userPayload;
   private String className;
   
-  public TezEntityDescriptor(String className, ByteBuffer userPayload) {
+  public TezEntityDescriptor(String className, byte[] userPayload) {
     this.userPayload = userPayload;
     this.className = className;
   }
   
-  public ByteBuffer getUserPayload() {
+  public byte[] getUserPayload() {
     return this.userPayload;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/92b2e215/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
index fdab79d..ce26b35 100644
--- a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -24,7 +24,6 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -97,9 +96,9 @@ public class TestDAGPlan {
   public void testUserPayloadSerde() {
     DAG dag = new DAG("testDag");
     ProcessorDescriptor pd1 = new ProcessorDescriptor("processor1",
-        ByteBuffer.wrap("processor1Bytes".getBytes()));
+        "processor1Bytes".getBytes());
     ProcessorDescriptor pd2 = new ProcessorDescriptor("processor2",
-        ByteBuffer.wrap("processor2Bytes".getBytes()));
+        "processor2Bytes".getBytes());
     Vertex v1 = new Vertex("v1", pd1, 10, Resource.newInstance(1024, 1));
     Vertex v2 = new Vertex("v2", pd2, 1, Resource.newInstance(1024, 1));
     v1.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
@@ -108,9 +107,9 @@ public class TestDAGPlan {
         .setTaskLocalResources(new HashMap<String, LocalResource>());
 
     InputDescriptor inputDescriptor = new InputDescriptor("input",
-        ByteBuffer.wrap("inputBytes".getBytes()));
+        "inputBytes".getBytes());
     OutputDescriptor outputDescriptor = new OutputDescriptor("output",
-        ByteBuffer.wrap("outputBytes".getBytes()));
+        "outputBytes".getBytes());
     Edge edge = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE,
         SourceType.STABLE, outputDescriptor, inputDescriptor));
 
@@ -146,15 +145,11 @@ public class TestDAGPlan {
     assertEquals(1, edgePropertyMap.size());
     EdgeProperty edgeProperty = edgePropertyMap.values().iterator().next();
 
-    byte[] ib = new byte[edgeProperty.getEdgeDestination().getUserPayload()
-        .capacity()];
-    edgeProperty.getEdgeDestination().getUserPayload().get(ib);
+    byte[] ib = edgeProperty.getEdgeDestination().getUserPayload();
     assertEquals("inputBytes", new String(ib));
     assertEquals("input", edgeProperty.getEdgeDestination().getClassName());
 
-    byte[] ob = new byte[edgeProperty.getEdgeSource().getUserPayload()
-        .capacity()];
-    edgeProperty.getEdgeSource().getUserPayload().get(ob);
+    byte[] ob = edgeProperty.getEdgeSource().getUserPayload();
     assertEquals("outputBytes", new String(ob));
     assertEquals("output", edgeProperty.getEdgeSource().getClassName());
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/92b2e215/tez-dag/src/main/java/org/apache/tez/dag/api/committer/VertexContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/committer/VertexContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/committer/VertexContext.java
index fb50923..e837390 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/committer/VertexContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/committer/VertexContext.java
@@ -18,7 +18,6 @@
 
 package org.apache.tez.dag.api.committer;
 
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -31,7 +30,7 @@ public interface VertexContext {
 
   public TezDAGID getDAGId();
   
-  public ByteBuffer getUserPayload();
+  public byte[] getUserPayload();
   
   // TODO Get rid of this as part of VertexContext cleanup.
   public ApplicationAttemptId getApplicationAttemptId();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/92b2e215/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 28790fb..4412436 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -877,7 +877,7 @@ public class DAGAppMaster extends AbstractService {
     Service service;
     List<Service> dependencies = new ArrayList<Service>();
     AtomicInteger dependenciesStarted = new AtomicInteger(0);
-    boolean canStart = false;
+    volatile boolean canStart = false;
 
     @Override
     public void stateChanged(Service dependency) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/92b2e215/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 3ad1e26..4459109 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -18,7 +18,6 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -402,13 +401,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     this.javaOpts = vertexPlan.getTaskConfig().hasJavaOpts() ? vertexPlan
         .getTaskConfig().getJavaOpts() : null;
 
-    ByteBuffer bb = getUserPayload();
+    byte[] bb = getUserPayload();
     if (bb == null) {
       LOG.info("No user payload - falling back to default AM tez conf");
       userConf = conf;
     } else {
       try {
-        userConf = MRHelpers.createConfFromByteBuffer(bb);
+        userConf = MRHelpers.createConfFromUserPayload(bb);
       } catch (IOException e) {
         LOG.info("Failed to create user conf from ByteBuffer");
         throw new TezUncheckedException(
@@ -1438,7 +1437,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   @Override
-  public ByteBuffer getUserPayload() {
+  public byte[] getUserPayload() {
     for (VertexPlan vertexPlan : getDAG().getJobPlan().getVertexList()) {
       if (vertexPlan.getName().equals(vertexName)) {
         if (!vertexPlan.getProcessorDescriptor().hasUserPayload()) {
@@ -1448,10 +1447,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           // readOnlyBuffer
           ByteString byteString = vertexPlan.getProcessorDescriptor()
               .getUserPayload();
-          int capacity = byteString.asReadOnlyByteBuffer().rewind().remaining();
-          byte[] b = new byte[capacity];
-          byteString.asReadOnlyByteBuffer().get(b, 0, capacity);
-          return ByteBuffer.wrap(b);
+          byte[] b = byteString.toByteArray();
+          return b;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/92b2e215/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java b/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java
index 12b2702..8dc9601 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java
@@ -18,7 +18,6 @@
 package org.apache.tez.engine.api;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -38,7 +37,7 @@ public interface Task {
    * @throws IOException
    * @throws InterruptedException
    */
-  public void initialize(Configuration conf, ByteBuffer userPayload,
+  public void initialize(Configuration conf, byte[] userPayload,
       Master master) throws IOException, InterruptedException;
   
   /**

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/92b2e215/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java b/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
index 7270866..c012928 100644
--- a/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
+++ b/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
@@ -21,7 +21,6 @@ package org.apache.tez.common;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -67,7 +66,7 @@ public class TezEngineTaskContext extends TezTaskContext {
     return processorDescriptor.getClassName();
   }
   
-  public ByteBuffer getProcessorUserPayload() {
+  public byte[] getProcessorUserPayload() {
     return processorDescriptor.getUserPayload();
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/92b2e215/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
index 4854fed..5631c78 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/task/RuntimeTask.java
@@ -18,7 +18,6 @@
 package org.apache.tez.engine.task;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezEngineTaskContext;
@@ -35,7 +34,7 @@ public class RuntimeTask implements Task {
   protected final Processor processor;
   
   protected TezEngineTaskContext taskContext;
-  protected ByteBuffer userPayload;
+  protected byte[] userPayload;
   protected Configuration conf;
   protected Master master;
   
@@ -50,7 +49,7 @@ public class RuntimeTask implements Task {
   }
 
   @Override
-  public void initialize(Configuration conf, ByteBuffer userPayload,
+  public void initialize(Configuration conf, byte[] userPayload,
       Master master) throws IOException, InterruptedException {
     this.conf = conf;
     this.userPayload = userPayload;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/92b2e215/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 6cef99c..71cab97 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -527,7 +527,7 @@ public class MRRSleepJob extends Configured implements Tool {
 
     Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
         MapProcessor.class.getName(),
-        MRHelpers.createByteBufferFromConf(mapStageConf)),
+        MRHelpers.createUserPayloadFromConf(mapStageConf)),
         numMapper,
         MRHelpers.getMapResource(mapStageConf));
     mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(mapStageConf));
@@ -550,7 +550,7 @@ public class MRRSleepJob extends Configured implements Tool {
             intermediateReduceStageConfs[i];
         Vertex ivertex = new Vertex("ireduce" + (i+1),
             new ProcessorDescriptor(ReduceProcessor.class.getName(),
-                MRHelpers.createByteBufferFromConf(iconf)),
+                MRHelpers.createUserPayloadFromConf(iconf)),
                 numIReducer,
                 MRHelpers.getReduceResource(iconf));
         ivertex.setJavaOpts(MRHelpers.getReduceJavaOpts(iconf));
@@ -566,7 +566,7 @@ public class MRRSleepJob extends Configured implements Tool {
     if (numReducer > 0) {
       finalReduceVertex = new Vertex("reduce", new ProcessorDescriptor(
           ReduceProcessor.class.getName(),
-          MRHelpers.createByteBufferFromConf(finalReduceConf)),
+          MRHelpers.createUserPayloadFromConf(finalReduceConf)),
           numReducer,
           MRHelpers.getReduceResource(finalReduceConf));
       finalReduceVertex.setJavaOpts(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/92b2e215/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
index 66a92c7..2b9a35f 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
@@ -130,7 +130,7 @@ public class TestMRRJobs {
     }
   }
 
-  @Test (timeout = 300000)
+  @Test (timeout = 60000)
   public void testMRRSleepJob() throws IOException, InterruptedException,
       ClassNotFoundException {
     LOG.info("\n\n\nStarting testMRRSleepJob().");
@@ -254,7 +254,7 @@ public class TestMRRJobs {
     // TODO verify failed task diagnostics
   }
 
-  @Test (timeout = 300000)
+  @Test (timeout = 60000)
   public void testFailingAttempt() throws IOException, InterruptedException,
       ClassNotFoundException {
 
@@ -290,7 +290,7 @@ public class TestMRRJobs {
     // TODO verify failed task diagnostics
   }
 
-  @Test (timeout = 300000)
+  @Test (timeout = 60000)
   public void testMRRSleepJobWithCompression() throws IOException,
       InterruptedException, ClassNotFoundException {
     LOG.info("\n\n\nStarting testMRRSleepJobWithCompression().");

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/92b2e215/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 51341a2..67be44a 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -270,23 +270,23 @@ public class TestMRRJobsDAGApi {
     DAG dag = new DAG("testMRRSleepJobDagSubmit");
     Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor(
         MapProcessor.class.getName(),
-        MRHelpers.createByteBufferFromConf(stage1Conf)),
+        MRHelpers.createUserPayloadFromConf(stage1Conf)),
         inputSplitInfo.getNumTasks(), Resource.newInstance(256, 1));
     Vertex stage2Vertex = new Vertex("ireduce", new ProcessorDescriptor(
         ReduceProcessor.class.getName(),
-        MRHelpers.createByteBufferFromConf(stage2Conf)),
+        MRHelpers.createUserPayloadFromConf(stage2Conf)),
         1, Resource.newInstance(256, 1));
     Vertex stage11Vertex = new Vertex("map1", new ProcessorDescriptor(
         MapProcessor.class.getName(),
-        MRHelpers.createByteBufferFromConf(stage1Conf)),
+        MRHelpers.createUserPayloadFromConf(stage1Conf)),
         inputSplitInfo1.getNumTasks(),  Resource.newInstance(256, 1));
     Vertex stage22Vertex = new Vertex("ireduce1", new ProcessorDescriptor(
         ReduceProcessor.class.getName(),
-        MRHelpers.createByteBufferFromConf(stage22Conf)),  
+        MRHelpers.createUserPayloadFromConf(stage22Conf)),  
         2, Resource.newInstance(256, 1));
     Vertex stage3Vertex = new Vertex("reduce", new ProcessorDescriptor(
         ReduceProcessor.class.getName(),
-        MRHelpers.createByteBufferFromConf(stage3Conf)),
+        MRHelpers.createUserPayloadFromConf(stage3Conf)),
         1, Resource.newInstance(256, 1));
 
     LocalResource appJarLr = createLocalResource(remoteFs,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/92b2e215/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index e3f8c67..ad63f7d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -21,10 +21,8 @@ package org.apache.tez.mapreduce.hadoop;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Vector;
@@ -433,25 +431,24 @@ public class MRHelpers {
 
   @LimitedPrivate("Hive, Pig")
   @Unstable
-  public static ByteBuffer createByteBufferFromConf(Configuration conf)
+  public static byte[] createUserPayloadFromConf(Configuration conf)
       throws IOException {
     Preconditions.checkNotNull(conf, "Configuration must be specified");
     DataOutputBuffer dob = new DataOutputBuffer();
     conf.write(dob);
-    return ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    return dob.getData();
   }
 
   @LimitedPrivate("Hive, Pig")
   @Unstable
-  public static Configuration createConfFromByteBuffer(ByteBuffer bb)
+  public static Configuration createConfFromUserPayload(byte[] bb)
       throws IOException {
     // TODO Avoid copy ?
-    Preconditions.checkNotNull(bb, "ByteBuffer must be specified");
+    Preconditions.checkNotNull(bb, "Bytes must be specified");
     DataInputBuffer dib = new DataInputBuffer();
-    dib.reset(bb.array(), 0, bb.capacity());
+    dib.reset(bb, 0, bb.length);
     Configuration conf = new Configuration(false);
     conf.readFields(dib);
-    bb.rewind();
     return conf;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/92b2e215/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
index 3e6a95a..15b6756 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java
@@ -20,10 +20,8 @@ package org.apache.tez.mapreduce.task;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.io.PrintStream;
 import java.net.URI;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -35,10 +33,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSError;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapOutputFile;
 import org.apache.hadoop.mapreduce.MRConfig;
@@ -78,7 +73,7 @@ public class MRRuntimeTask extends RuntimeTask {
   }
 
   @Override
-  public void initialize(Configuration conf, ByteBuffer userPayload,
+  public void initialize(Configuration conf, byte[] userPayload,
       Master master) throws IOException, InterruptedException {
 
     DeprecatedKeys.init();
@@ -92,7 +87,7 @@ public class MRRuntimeTask extends RuntimeTask {
       taskConf = MultiStageMRConfigUtil.getConfForVertex(mrConf,
           taskContext.getVertexName());
     } else {
-      taskConf = MRHelpers.createConfFromByteBuffer(userPayload);
+      taskConf = MRHelpers.createConfFromUserPayload(userPayload);
       copyTezConfigParameters(taskConf, conf);
     }
 
@@ -264,25 +259,4 @@ public class MRRuntimeTask extends RuntimeTask {
     }
   }
 
-  private static final FsPermission urw_gr = FsPermission
-      .createImmutable((short) 0640);
-
-  /**
-   * Write the task specific job-configuration file.
-   * 
-   * @throws IOException
-   */
-  private static void writeLocalJobFile(Path jobFile, JobConf conf)
-      throws IOException {
-    FileSystem localFs = FileSystem.getLocal(conf);
-    localFs.delete(jobFile);
-    OutputStream out = null;
-    try {
-      out = FileSystem.create(localFs, jobFile, urw_gr);
-      conf.writeXml(out);
-    } finally {
-      IOUtils.cleanup(LOG, out);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/92b2e215/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index 56f2eb0..736563b 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -395,7 +395,7 @@ public class YARNRunner implements ClientProtocol {
     Resource taskResource = isMap ? MRHelpers.getMapResource(stageConf)
         : MRHelpers.getReduceResource(stageConf);
     Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor(
-        processorName, MRHelpers.createByteBufferFromConf(stageConf)),
+        processorName, MRHelpers.createUserPayloadFromConf(stageConf)),
         numTasks, taskResource);
 
     Map<String, String> taskEnv = new HashMap<String, String>();


Mime
View raw message