tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-227. Make per-vertex user payload available to the VertexOutputCommitter. (sseth)
Date Fri, 14 Jun 2013 08:20:28 GMT
Updated Branches:
  refs/heads/master 89754798f -> 0975207c8


TEZ-227. Make per-vertex user payload available to the
VertexOutputCommitter. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/0975207c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/0975207c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/0975207c

Branch: refs/heads/master
Commit: 0975207c89cd85064e2830bf8e9b6eb0c3598c8d
Parents: 8975479
Author: Siddharth Seth <sseth@apache.org>
Authored: Fri Jun 14 01:19:48 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Fri Jun 14 01:19:48 2013 -0700

----------------------------------------------------------------------
 .../hadoop/mapred/MRVertexOutputCommitter.java  | 16 ++++++++++++--
 .../tez/dag/api/committer/VertexContext.java    |  4 ++++
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 23 ++++++++++++++++++++
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  1 +
 4 files changed, 42 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0975207c/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java
index 0121ab3..dad0287 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java
@@ -40,6 +40,7 @@ import org.apache.tez.dag.api.committer.VertexOutputCommitter;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.utils.TezBuilderUtils;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 
 public class MRVertexOutputCommitter extends VertexOutputCommitter {
@@ -92,14 +93,25 @@ public class MRVertexOutputCommitter extends VertexOutputCommitter {
   }
 
   // FIXME we are using ApplicationId as DAG id
-  private JobContext getJobContextFromVertexContext(VertexContext context) {
+  private JobContext getJobContextFromVertexContext(VertexContext context)
+      throws IOException {
     // FIXME when we have the vertex level user-land configuration
     // jobConf should be initialized using the user-land level configuration
     // for the vertex in question
-    JobConf jobConf = new JobConf(context.getConf());
+
+    Configuration conf = null;
+
+    if (context.getUserPayload() != null) {
+      conf = MRHelpers.createConfFromByteBuffer(context.getUserPayload());
+    } else {
+      conf = context.getConf();
+    }
+
+    JobConf jobConf = new JobConf(conf);
     JobID jobId = TypeConverter.fromYarn(context.getDAGId().getApplicationId());
     jobConf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
     return new MRJobContextImpl(jobConf, jobId);
+
   }
 
   private State getJobStateFromVertexStatusState(VertexStatus.State state) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0975207c/tez-dag/src/main/java/org/apache/tez/dag/api/committer/VertexContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/committer/VertexContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/committer/VertexContext.java
index 14880d2..fb50923 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/committer/VertexContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/committer/VertexContext.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.dag.api.committer;
 
+import java.nio.ByteBuffer;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.tez.dag.records.TezDAGID;
@@ -29,6 +31,8 @@ public interface VertexContext {
 
   public TezDAGID getDAGId();
   
+  public ByteBuffer getUserPayload();
+  
   // TODO Get rid of this as part of VertexContext cleanup.
   public ApplicationAttemptId getApplicationAttemptId();
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0975207c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index ffaf8d0..fd930d4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -104,6 +105,7 @@ import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
 
 
 /** Implementation of Vertex interface. Maintains the state machines of Vertex.
@@ -1328,4 +1330,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   VertexScheduler getVertexScheduler() {
     return this.vertexScheduler;
   }
+
+  @Override
+  public ByteBuffer getUserPayload() {
+    for (VertexPlan vertexPlan : getDAG().getJobPlan().getVertexList()) {
+      if (vertexPlan.getName().equals(vertexName)) {
+        if (!vertexPlan.getProcessorDescriptor().hasUserPayload()) {
+          return null;
+        } else {
+          // Needs to be a ByteBuffer which allows toArray. PB returns a
+          // readOnlyBuffer
+          ByteString byteString = vertexPlan.getProcessorDescriptor()
+              .getUserPayload();
+          int capacity = byteString.asReadOnlyByteBuffer().rewind().remaining();
+          byte[] b = new byte[capacity];
+          byteString.asReadOnlyByteBuffer().get(b, 0, capacity);
+          return ByteBuffer.wrap(b);
+        }
+      }
+    }
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0975207c/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index af8b1e1..c811060 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -475,6 +475,7 @@ public class TestVertexImpl {
     DAG dag = mock(DAG.class);
     doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
     doReturn(dag).when(appContext).getDAG();
+    doReturn(DAGPlan.getDefaultInstance()).when(dag).getJobPlan();
     doReturn(dagId).when(appContext).getDAGID();
     doReturn(dagId).when(dag).getID();
     setupVertices();


Mime
View raw message