tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-696. Remove implicit copying of processor payload to input and output (bikas)
Date Wed, 16 Jul 2014 17:18:39 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 1f0dd5d14 -> 55524f0ad


TEZ-696. Remove implicit copying of processor payload to input and output (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/55524f0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/55524f0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/55524f0a

Branch: refs/heads/master
Commit: 55524f0ad6ef565046a848a04238952a55693fae
Parents: 1f0dd5d
Author: Bikas Saha <bikas@apache.org>
Authored: Wed Jul 16 10:18:25 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Wed Jul 16 10:18:25 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     | 12 ++-
 .../org/apache/tez/common/TezUserPayload.java   |  2 -
 .../processor/map/TestMapProcessor.java         | 78 +-------------------
 .../processor/reduce/TestReduceProcessor.java   | 10 ++-
 .../runtime/LogicalIOProcessorRuntimeTask.java  |  8 +-
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 18 ++---
 6 files changed, 27 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/55524f0a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 00b5f82..f228dcd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,13 +16,17 @@ INCOMPATIBLE CHANGES
   TEZ-692. Unify job submission in either TezClient or TezSession
   TEZ-1130. Replace confusing names on Vertex API
   TEZ-1213. Fix parameter naming in TezJobConfig.
-    - Details at https://issues.apache.org/jira/browse/TEZ-1213?focusedCommentId=14039381&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14039381
+    - Details at https://issues.apache.org/jira/browse/TEZ-1213?focusedCommentId
+    =14039381&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpa
+    nel#comment-14039381
   TEZ-1080, TEZ-1272, TEZ-1279, TEZ-1266. Change YARNRunner to use EdgeConfigs. 
-    - Removes separation of runtime configs into input/ouput configs. Also refactors public
methods used for this conversion.
+    - Removes separation of runtime configs into input/ouput configs. Also 
+    refactors public methods used for this conversion.
+  TEZ-696. Remove implicit copying of processor payload to input and output
 
-Release 0.4.0-incubating: 2014-04-05
+  Release 0.4.0-incubating: 2014-04-05
 
-ALL CHANGES
+  ALL CHANGES
 
   TEZ-932 addendum. Add missing license to file.
   TEZ-1001 addendum. Remove checked in jar. (sseth)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/55524f0a/tez-api/src/main/java/org/apache/tez/common/TezUserPayload.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezUserPayload.java b/tez-api/src/main/java/org/apache/tez/common/TezUserPayload.java
index 9531e26..6606194 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezUserPayload.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezUserPayload.java
@@ -18,8 +18,6 @@
 
 package org.apache.tez.common;
 
-import com.google.common.base.Preconditions;
-
 /**
  * Helper class to hold user payload.
  */

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/55524f0a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index a91ad31..5ffee56 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.tez.common.MRFrameworkConfigs;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.mapreduce.TestUmbilical;
@@ -123,7 +124,9 @@ public class TestMapProcessor {
         new InputDescriptor(MRInputLegacy.class.getName())
             .setUserPayload(MRHelpers.createMRInputPayload(jobConf, null)),
         1);
-    OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()),
1);
+    OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", 
+        new OutputDescriptor(LocalOnFileSorterOutput.class.getName())
+            .setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);
 
     LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, jobConf,
0,
         new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName,
@@ -165,77 +168,4 @@ public class TestMapProcessor {
     }
     reader.close();
   }
-
-//  @Test
-//  @Ignore
-//  public void testMapProcessorWithInMemSort() throws Exception {
-//    
-//    String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
-//    
-//    final int partitions = 2;
-//    JobConf jobConf = new JobConf(defaultConf);
-//    jobConf.setNumReduceTasks(partitions);
-//    setUpJobConf(jobConf);
-//    TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
-//    mapOutputs.setConf(jobConf);
-//    
-//    Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
-//    Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
-//        vertexName);
-//    
-//    JobConf job = new JobConf(stageConf);
-//
-//    job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
-//        "localized-resources").toUri().toString());
-//    localFs.delete(workDir, true);
-//    Task t =
-//        MapUtils.runMapProcessor(
-//            localFs, workDir, job, 0, new Path(workDir, "map0"), 
-//            new TestUmbilicalProtocol(true), vertexName, 
-//            Collections.singletonList(new InputSpec("NullVertex", 0,
-//                MRInput.class.getName())),
-//            Collections.singletonList(new OutputSpec("FakeVertex", 1,
-//                OldInMemorySortedOutput.class.getName()))
-//            );
-//    OldInMemorySortedOutput[] outputs = (OldInMemorySortedOutput[])t.getOutputs();
-//    
-//    verifyInMemSortedStream(outputs[0], 0, 4096);
-//    int i = 0;
-//    for (i = 2; i < 256; i <<= 1) {
-//      verifyInMemSortedStream(outputs[0], 0, i);
-//    }
-//    verifyInMemSortedStream(outputs[0], 1, 4096);
-//    for (i = 2; i < 256; i <<= 1) {
-//      verifyInMemSortedStream(outputs[0], 1, i);
-//    }
-//
-//    t.close();
-//  }
-//  
-//  private void verifyInMemSortedStream(
-//      OldInMemorySortedOutput output, int partition, int chunkSize) 
-//          throws Exception {
-//    ChunkedStream cs = 
-//        new ChunkedStream(
-//            output.getSorter().getSortedStream(partition), chunkSize);
-//    int actualBytes = 0;
-//    ChannelBuffer b = null;
-//    while ((b = (ChannelBuffer)cs.nextChunk()) != null) {
-//      LOG.info("b = " + b);
-//      actualBytes += 
-//          (b instanceof TruncatedChannelBuffer) ? 
-//              ((TruncatedChannelBuffer)b).capacity() :
-//              ((BigEndianHeapChannelBuffer)b).readableBytes();
-//    }
-//    
-//    LOG.info("verifyInMemSortedStream" +
-//    		" partition=" + partition + 
-//    		" chunkSize=" + chunkSize +
-//        " expected=" + 
-//    		output.getSorter().getShuffleHeader(partition).getCompressedLength() + 
-//        " actual=" + actualBytes);
-//    Assert.assertEquals(
-//        output.getSorter().getShuffleHeader(partition).getCompressedLength(), 
-//        actualBytes);
-//  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/55524f0a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index eff108b..b1b865d 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -133,7 +133,9 @@ public class TestReduceProcessor {
         new InputDescriptor(MRInputLegacy.class.getName())
             .setUserPayload(MRHelpers.createMRInputPayload(jobConf, null)),
         1);
-    OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()),
1);
+    OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", 
+        new OutputDescriptor(LocalOnFileSorterOutput.class.getName()).
+          setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);
     // Run a map
     LogicalIOProcessorRuntimeTask mapTask = MapUtils.createLogicalTask(localFs, workDir,
jobConf, 0,
         mapInput, new TestUmbilical(), dagName, mapVertexName,
@@ -156,9 +158,11 @@ public class TestReduceProcessor {
         ReduceProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(jobConf));
     
     InputSpec reduceInputSpec = new InputSpec(mapVertexName,
-        new InputDescriptor(LocalMergedInput.class.getName()), 1);
+        new InputDescriptor(LocalMergedInput.class.getName())
+            .setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);
     OutputSpec reduceOutputSpec = new OutputSpec("NullDestinationVertex",
-        new OutputDescriptor(MROutputLegacy.class.getName()), 1);
+        new OutputDescriptor(MROutputLegacy.class.getName())
+            .setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);
 
     // Now run a reduce
     TaskSpec taskSpec = new TaskSpec(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/55524f0a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 9be87a4..881ae90 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -468,9 +468,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         taskSpec.getDAGName(), taskSpec.getVertexName(),
         inputSpec.getSourceVertexName(), taskSpec.getTaskAttemptID(),
         tezCounters, inputIndex,
-        inputSpec.getInputDescriptor().getUserPayload() == null ? taskSpec
-            .getProcessorDescriptor().getUserPayload() : inputSpec
-            .getInputDescriptor().getUserPayload(), this,
+        inputSpec.getInputDescriptor().getUserPayload(), this,
         serviceConsumerMetadata, System.getenv(), initialMemoryDistributor,
         inputSpec.getInputDescriptor(), input, inputReadyTracker);
     return inputContext;
@@ -482,9 +480,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         taskSpec.getDAGName(), taskSpec.getVertexName(),
         outputSpec.getDestinationVertexName(), taskSpec.getTaskAttemptID(),
         tezCounters, outputIndex,
-        outputSpec.getOutputDescriptor().getUserPayload() == null ? taskSpec
-            .getProcessorDescriptor().getUserPayload() : outputSpec
-            .getOutputDescriptor().getUserPayload(), this,
+        outputSpec.getOutputDescriptor().getUserPayload(), this,
         serviceConsumerMetadata, System.getenv(), initialMemoryDistributor,
         outputSpec.getOutputDescriptor());
     return outputContext;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/55524f0a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index ab80989..e5667c7 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -430,9 +430,6 @@ public class TestMRRJobsDAGApi {
     stage2Conf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
         MRRSleepJobPartitioner.class.getName());
 
-    JobConf stage22Conf = new JobConf(stage2Conf);
-    stage22Conf.setInt(MRJobConfig.NUM_REDUCES, 2);
-
     stage3Conf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, 1);
     stage3Conf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, 1);
     stage3Conf.setInt(MRJobConfig.NUM_REDUCES, 1);
@@ -446,13 +443,10 @@ public class TestMRRJobsDAGApi {
 
     MRHelpers.translateVertexConfToTez(stage1Conf);
     MRHelpers.translateVertexConfToTez(stage2Conf);
-    MRHelpers.translateVertexConfToTez(stage22Conf);
-    // this also works stage22 as it sets up keys etc
     MRHelpers.translateVertexConfToTez(stage3Conf);
 
     MRHelpers.doJobClientMagic(stage1Conf);
     MRHelpers.doJobClientMagic(stage2Conf);
-    MRHelpers.doJobClientMagic(stage22Conf);
     MRHelpers.doJobClientMagic(stage3Conf);
 
     Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
@@ -465,6 +459,7 @@ public class TestMRRJobsDAGApi {
     }
 
     byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
+    byte[] stage2Payload = MRHelpers.createUserPayloadFromConf(stage2Conf);
     byte[] stage1InputPayload = MRHelpers.createMRInputPayload(stage1Payload, null);
     byte[] stage3Payload = MRHelpers.createUserPayloadFromConf(stage3Conf);
     
@@ -479,8 +474,7 @@ public class TestMRRJobsDAGApi {
         stage1NumTasks, Resource.newInstance(256, 1));
     MRHelpers.addMRInput(stage1Vertex, stage1InputPayload, inputInitializerClazz);
     Vertex stage2Vertex = new Vertex("ireduce", new ProcessorDescriptor(
-        ReduceProcessor.class.getName()).setUserPayload(
-        MRHelpers.createUserPayloadFromConf(stage2Conf)),
+        ReduceProcessor.class.getName()).setUserPayload(stage2Payload),
         1, Resource.newInstance(256, 1));
     Vertex stage3Vertex = new Vertex("reduce", new ProcessorDescriptor(
         ReduceProcessor.class.getName()).setUserPayload(stage3Payload),
@@ -512,13 +506,13 @@ public class TestMRRJobsDAGApi {
     Edge edge1 = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty(
         DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
         SchedulingType.SEQUENTIAL, new OutputDescriptor(
-        OnFileSortedOutput.class.getName()), new InputDescriptor(
-                ShuffledMergedInputLegacy.class.getName())));
+        OnFileSortedOutput.class.getName()).setUserPayload(stage2Payload), new InputDescriptor(
+                ShuffledMergedInputLegacy.class.getName()).setUserPayload(stage2Payload)));
     Edge edge2 = new Edge(stage2Vertex, stage3Vertex, new EdgeProperty(
         DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
         SchedulingType.SEQUENTIAL, new OutputDescriptor(
-        OnFileSortedOutput.class.getName()), new InputDescriptor(
-                ShuffledMergedInputLegacy.class.getName())));
+        OnFileSortedOutput.class.getName()).setUserPayload(stage3Payload), new InputDescriptor(
+                ShuffledMergedInputLegacy.class.getName()).setUserPayload(stage3Payload)));
 
     dag.addEdge(edge1);
     dag.addEdge(edge2);


Mime
View raw message