tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject [48/50] [abbrv] tez git commit: Merge branch 'master' into TEZ-3334-MERGE1
Date Wed, 24 May 2017 21:08:17 GMT
Merge branch 'master' into TEZ-3334-MERGE1


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a496252e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a496252e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a496252e

Branch: refs/heads/master
Commit: a496252e8d527689b7530094780fb28ce25b6b09
Parents: 251ca1c 1176733
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Fri May 19 14:57:08 2017 -0500
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Fri May 19 14:57:08 2017 -0500

----------------------------------------------------------------------
 docs/pom.xml                                    |   9 +
 .../apache/tez/dag/api/TezConfiguration.java    |  10 +
 .../tez/serviceplugins/api/TaskScheduler.java   |  15 +
 .../dag/app/TaskCommunicatorContextImpl.java    |  13 +-
 .../tez/dag/app/TaskCommunicatorManager.java    |   9 +-
 .../tez/dag/app/TezTaskCommunicatorImpl.java    |   4 +-
 .../dag/app/dag/TaskAttemptStateInternal.java   |   1 +
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   7 +
 .../event/TaskAttemptEventStartedRemotely.java  |  33 +-
 .../dag/event/TaskAttemptEventSubmitted.java    |  49 ++
 .../dag/app/dag/event/TaskAttemptEventType.java |   1 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 101 +++-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  20 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  75 ++-
 .../app/rm/AMSchedulerEventTAStateUpdated.java  |  42 ++
 .../tez/dag/app/rm/AMSchedulerEventType.java    |   1 +
 .../tez/dag/app/rm/TaskSchedulerManager.java    |  20 +
 .../tez/dag/app/rm/TaskSchedulerWrapper.java    |   4 +
 .../api/TaskCommunicatorContext.java            |  22 +-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 230 ++++++--
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |   9 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  19 +-
 .../TezTestServiceTaskCommunicatorImpl.java     |   5 +-
 .../tez-yarn-timeline-history-with-fs/pom.xml   |   5 +
 .../library/api/TezRuntimeConfiguration.java    |  11 +
 .../CartesianProductCombination.java            |   4 +-
 .../CartesianProductConfig.java                 |  45 +-
 .../CartesianProductEdgeManager.java            |  11 +-
 .../CartesianProductEdgeManagerConfig.java      |  67 ---
 .../CartesianProductEdgeManagerPartitioned.java |  31 +-
 .../CartesianProductEdgeManagerReal.java        |   3 +-
 ...artesianProductEdgeManagerUnpartitioned.java | 125 -----
 .../CartesianProductVertexManager.java          |  64 ++-
 .../CartesianProductVertexManagerConfig.java    |  77 ---
 ...artesianProductVertexManagerPartitioned.java |  38 +-
 .../CartesianProductVertexManagerReal.java      |   3 +-
 ...tesianProductVertexManagerUnpartitioned.java | 438 ---------------
 .../FairCartesianProductEdgeManager.java        | 174 ++++++
 .../FairCartesianProductVertexManager.java      | 551 +++++++++++++++++++
 .../library/common/shuffle/ShuffleUtils.java    |   2 +
 .../common/shuffle/impl/ShuffleManager.java     |  34 +-
 .../shuffle/orderedgrouped/MergeManager.java    |   5 +-
 .../orderedgrouped/ShuffleScheduler.java        |   2 +-
 .../common/sort/impl/TezIndexRecord.java        |   2 +-
 .../library/common/sort/impl/TezMerger.java     |  77 ++-
 .../writers/UnorderedPartitionedKVWriter.java   | 196 +++++--
 .../partitioner/RoundRobinPartitioner.java      |  30 +
 .../tez/runtime/library/utils/Grouper.java      |  66 +--
 .../main/proto/CartesianProductPayload.proto    |  11 +-
 .../src/main/proto/ShufflePayloads.proto        |   1 +
 .../TestCartesianProductCombination.java        |   2 +-
 .../TestCartesianProductConfig.java             |  37 +-
 .../TestCartesianProductEdgeManager.java        |   8 +-
 .../TestCartesianProductEdgeManagerConfig.java  |  53 --
 ...tCartesianProductEdgeManagerPartitioned.java |  77 +--
 ...artesianProductEdgeManagerUnpartitioned.java | 288 ----------
 .../TestCartesianProductVertexManager.java      |   2 +-
 ...TestCartesianProductVertexManagerConfig.java |  53 --
 ...artesianProductVertexManagerPartitioned.java |  26 +-
 ...tesianProductVertexManagerUnpartitioned.java | 460 ----------------
 .../TestFairCartesianProductEdgeManager.java    | 245 +++++++++
 .../TestFairCartesianProductVertexManager.java  | 500 +++++++++++++++++
 .../library/cartesianproduct/TestGrouper.java   |  36 +-
 .../common/shuffle/TestShuffleUtils.java        |   5 +-
 .../impl/TestShuffleInputEventHandlerImpl.java  |  14 +-
 .../TestUnorderedPartitionedKVWriter.java       | 117 +++-
 .../mapreduce/examples/CartesianProduct.java    | 385 +++++++++++++
 .../tez/mapreduce/examples/ExampleDriver.java   |   2 +
 .../org/apache/tez/test/TestFaultTolerance.java |   5 +-
 .../java/org/apache/tez/test/TestOutput.java    |  11 +
 .../java/org/apache/tez/test/TestTezJobs.java   |  13 +
 tez-ui/src/main/webapp/app/models/vertex.js     |  13 +-
 72 files changed, 3124 insertions(+), 2000 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a496252e/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/a496252e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/a496252e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index bc3ca0e,8716b92..81fac38
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@@ -475,10 -475,15 +485,16 @@@ public class ShuffleManager implements 
      if (inputHost.getNumPendingPartitions() > 0) {
        pendingHosts.add(inputHost); //add it to queue
      }
+     for(InputAttemptIdentifier input : pendingInputsOfOnePartition.getInputs()) {
+       ShuffleEventInfo eventInfo = shuffleInfoEventsMap.get(input.getInputIdentifier());
+       if (eventInfo != null) {
+         eventInfo.scheduledForDownload = true;
+       }
+     }
      fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(),
 -        pendingInputsOfOnePartition.getPartition(),
 -            pendingInputsOfOnePartition.getInputs());
 +        pendingInputsOfOnePartitionRange.getPartition(),
 +        pendingInputsOfOnePartitionRange.getPartitionCount(),
 +            pendingInputsOfOnePartitionRange.getInputs());
      if (LOG.isDebugEnabled()) {
        LOG.debug("Created Fetcher for host: " + inputHost.getHost()
            + ", info: " + inputHost.getAdditionalInfo()

http://git-wip-us.apache.org/repos/asf/tez/blob/a496252e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/a496252e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/a496252e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/a496252e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/a496252e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --cc tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
index e6accda,af52f90..c934f6c
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
@@@ -224,10 -220,13 +224,13 @@@ public class TestShuffleInputEventHandl
      dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 0);
      handler.handleEvents(Collections.singletonList(dme));
  
 -    InputAttemptIdentifier expectedId2 = new InputAttemptIdentifier(1, 0,
 -        PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1);
 +    CompositeInputAttemptIdentifier expectedId2 = new CompositeInputAttemptIdentifier(1,
0,
 +        PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1,
1);
      verify(shuffleManager, times(2)).addKnownInput(eq(HOST), eq(PORT), eq(expectedId2),
eq(0));
  
+     // Let attemptNum 0 be scheduled.
+     shuffleManager.shuffleInfoEventsMap.get(expectedId2.getInputIdentifier()).scheduledForDownload
= true;
+ 
      //0--> 1 with spill id 1 (attemptNum 1).  This should report exception
      dme = createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 1);
      handler.handleEvents(Collections.singletonList(dme));
@@@ -253,10 -252,13 +256,13 @@@
      Event dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 1);
      handler.handleEvents(Collections.singletonList(dme));
  
 -    InputAttemptIdentifier expected = new InputAttemptIdentifier(1, 1,
 -        PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1);
 +    CompositeInputAttemptIdentifier expected = new CompositeInputAttemptIdentifier(1, 1,
 +        PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1,
1);
      verify(shuffleManager, times(1)).addKnownInput(eq(HOST), eq(PORT), eq(expected), eq(0));
  
+     // Let attemptNum 1 be scheduled.
+     shuffleManager.shuffleInfoEventsMap.get(expected.getInputIdentifier()).scheduledForDownload
= true;
+ 
      //Now send attemptNum 0.  This should throw exception, because attempt #1 is already
added
      dme = createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 0);
      handler.handleEvents(Collections.singletonList(dme));

http://git-wip-us.apache.org/repos/asf/tez/blob/a496252e/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --cc tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 07feb20,6ea1562..27e7992
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@@ -162,11 -161,10 +163,12 @@@ public class TestUnorderedPartitionedKV
      ApplicationId appId = ApplicationId.newInstance(10000000, 1);
      TezCounters counters = new TezCounters();
      String uniqueId = UUID.randomUUID().toString();
 -    OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
 +    String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
 +        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
 +    OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService);
  
-     int maxSingleBufferSizeBytes = 2047;
+     final int maxSingleBufferSizeBytes = 2047;
+     final long sizePerBuffer = maxSingleBufferSizeBytes - 64 - maxSingleBufferSizeBytes
% 4;
      Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
          false, maxSingleBufferSizeBytes);
  
@@@ -718,13 -774,14 +784,17 @@@
      ApplicationId appId = ApplicationId.newInstance(10000000, 1);
      TezCounters counters = new TezCounters();
      String uniqueId = UUID.randomUUID().toString();
 -    OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
 +    int dagId = 1;
 +    String auxiliaryService = defaultConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
 +        TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
 +    OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, auxiliaryService);
  
      Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
-         shouldCompress, -1);
+         shouldCompress, maxSingleBufferSizeBytes);
+     conf.setInt(
+         TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT,
+         bufferMergePercent);
+ 
      CompressionCodec codec = null;
      if (shouldCompress) {
        codec = new DefaultCodec();


Mime
View raw message