tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhiyu...@apache.org
Subject [04/50] [abbrv] tez git commit: TEZ-3244. Allow overlap of input and output memory when they are not concurrent. (jlowe)
Date Tue, 28 Mar 2017 19:47:45 GMT
TEZ-3244. Allow overlap of input and output memory when they are not concurrent. (jlowe)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/63ae97d5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/63ae97d5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/63ae97d5

Branch: refs/heads/TEZ-1190
Commit: 63ae97d5f3fe6e30e3c5f7c9a892ef9902e83b39
Parents: b3a3af3
Author: Jason Lowe <jlowe@yahoo-inc.com>
Authored: Tue Feb 7 13:32:37 2017 -0600
Committer: Jason Lowe <jlowe@yahoo-inc.com>
Committed: Tue Feb 7 13:32:37 2017 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../apache/tez/dag/api/TezConfiguration.java    |  30 ++++
 .../common/resources/MemoryDistributor.java     |  12 +-
 .../WeightedScalingMemoryDistributor.java       |  62 ++++++-
 .../TestWeightedScalingMemoryDistributor.java   | 165 +++++++++++++++++++
 5 files changed, 264 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/63ae97d5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 16e239f..a7cc0ce 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3244. Allow overlap of input and output memory when they are not concurrent
   TEZ-3581. Add different logger to enable suppressing logs for specific lines.
   TEZ-3601. Add another HistoryLogLevel to suppress TaskAttempts at specific levels
   TEZ-3600. Fix flaky test: TestTokenCache
@@ -197,6 +198,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3244. Allow overlap of input and output memory when they are not concurrent
   TEZ-3601. Add another HistoryLogLevel to suppress TaskAttempts at specific levels
   TEZ-3462. Task attempt failure during container shutdown loses useful container diagnostics
   TEZ-3574. Container reuse won't pickup extra dag level local resource.

http://git-wip-us.apache.org/repos/asf/tez/blob/63ae97d5/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index fd71b35..94f40bb 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -875,6 +875,36 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS =
       TEZ_TASK_PREFIX + "scale.memory.ratios";
 
+  /**
+   * Concurrent input/output memory allocation control. When enabled memory
+   * distributions assume that inputs and outputs will use their memory
+   * simultaneously. When disabled the distributions assume that outputs are not
+   * initialized until inputs release memory buffers, allowing inputs to
+   * leverage memory normally set aside for outputs and vice-versa.
+   * NOTE: This property currently is not supported by the ScalingAllocator
+   *       memory distributor.
+   */
+  @Private
+  @Unstable
+  @ConfigurationScope(Scope.VERTEX)
+  public static final String TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT =
+      TEZ_TASK_PREFIX + "scale.memory.input-output-concurrent";
+  public static final boolean TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT_DEFAULT = true;
+
+  /**
+   * Controls distributing output memory to inputs when non-concurrent I/O
+   * memory allocation is being used.  When enabled inputs will receive the
+   * same memory allocation as if concurrent I/O memory allocation were used.
+   * NOTE: This property currently is not supported by the ScalingAllocator
+   *       memory distributor.
+   */
+  @Private
+  @Unstable
+  @ConfigurationScope(Scope.VERTEX)
+  public static final String TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED =
+      TEZ_TASK_PREFIX + "scale.memory.non-concurrent-inputs.enabled";
+  public static final boolean TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED_DEFAULT
= false;
+
   @Private
   @Unstable
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/63ae97d5/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
index c822357..e63a414 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
@@ -61,6 +61,7 @@ public class MemoryDistributor {
 
   private long totalJvmMemory;
   private final boolean isEnabled;
+  private final boolean isInputOutputConcurrent;
   private final String allocatorClassName;
   private final Set<TaskContext> dupSet = Collections
       .newSetFromMap(new ConcurrentHashMap<TaskContext, Boolean>());
@@ -78,6 +79,9 @@ public class MemoryDistributor {
     this.conf = conf;
     isEnabled = conf.getBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED,
         TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED_DEFAULT);
+    isInputOutputConcurrent = conf.getBoolean(
+        TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT,
+        TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT_DEFAULT);
 
     if (isEnabled) {
       allocatorClassName = conf.get(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS,
@@ -213,9 +217,11 @@ public class MemoryDistributor {
     Preconditions.checkState(numAllocations == numRequestors,
         "Number of allocations must match number of requestors. Allocated=" + numAllocations
             + ", Requests: " + numRequestors);
-    Preconditions.checkState(totalAllocated <= totalJvmMemory,
-        "Total allocation should be <= availableMem. TotalAllocated: " + totalAllocated
-            + ", totalJvmMemory: " + totalJvmMemory);
+    if (isInputOutputConcurrent) {
+      Preconditions.checkState(totalAllocated <= totalJvmMemory,
+          "Total allocation should be <= availableMem. TotalAllocated: " + totalAllocated
+              + ", totalJvmMemory: " + totalJvmMemory);
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/63ae97d5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
index 8477300..c5b4fb0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.common.resources.InitialMemoryAllocator;
 import org.apache.tez.runtime.common.resources.InitialMemoryRequestContext;
+import org.apache.tez.runtime.common.resources.InitialMemoryRequestContext.ComponentType;
 import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
 import org.apache.tez.runtime.library.input.OrderedGroupedInputLegacy;
 import org.apache.tez.runtime.library.input.UnorderedKVInput;
@@ -129,9 +130,15 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
         + availableForAllocation + ", TotalRequested/TotalJVMHeap:"
         + new DecimalFormat("0.00").format(ratio));
 
+    int numInputRequestsScaled = 0;
+    int numOutputRequestsScaled = 0;
+    long totalInputAllocated = 0;
+    long totalOutputAllocated = 0;
+
     // Actual scaling
     List<Long> allocations = Lists.newArrayListWithCapacity(numRequests);
     for (Request request : requests) {
+      long allocated = 0;
       if (request.requestSize == 0) {
         allocations.add(0l);
         if (LOG.isDebugEnabled()) {
@@ -141,7 +148,7 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
       } else {
         double requestFactor = request.requestWeight / (double) numRequestsScaled;
         double scaledRequest = requestFactor * request.requestSize;
-        long allocated = Math.min(
+        allocated = Math.min(
             (long) ((scaledRequest / totalScaledRequest) * availableForAllocation),
             request.requestSize);
         // TODO Later - If requestedSize is used, the difference (allocated -
@@ -152,9 +159,52 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
               + request.requestType + " " + request.requestSize + "  to allocated: " + allocated);
         }
       }
+
+      if (request.componentType == ComponentType.INPUT) {
+        numInputRequestsScaled += request.requestWeight;
+        totalInputAllocated += allocated;
+      } else if (request.componentType == ComponentType.OUTPUT) {
+        numOutputRequestsScaled += request.requestWeight;
+        totalOutputAllocated += allocated;
+      }
+    }
+
+    if (!conf.getBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT,
+        TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT_DEFAULT)) {
+      adjustAllocationsForNonConcurrent(allocations, requests,
+          numInputRequestsScaled, totalInputAllocated,
+          numOutputRequestsScaled, totalOutputAllocated);
     }
+
     return allocations;
+  }
 
+  private void adjustAllocationsForNonConcurrent(List<Long> allocations,
+      List<Request> requests, int numInputsScaled, long totalInputAllocated,
+      int numOutputsScaled, long totalOutputAllocated) {
+    boolean inputsEnabled = conf.getBoolean(
+        TezConfiguration.TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED,
+        TezConfiguration.TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED_DEFAULT);
+    LOG.info("Adjusting scaled allocations for I/O non-concurrent."
+        + " numInputsScaled: {} InputAllocated: {} numOutputsScaled: {} outputAllocated:
{} inputsEnabled: {}",
+        numInputsScaled, totalInputAllocated, numOutputsScaled, totalOutputAllocated, inputsEnabled);
+    for (int i = 0; i < requests.size(); i++) {
+      Request request = requests.get(i);
+      long additional = 0;
+      if (request.componentType == ComponentType.INPUT && inputsEnabled) {
+        double share = request.requestWeight / (double)numInputsScaled;
+        additional = (long) (totalOutputAllocated * share);
+      } else if (request.componentType == ComponentType.OUTPUT) {
+        double share = request.requestWeight / (double)numOutputsScaled;
+        additional = (long) (totalInputAllocated * share);
+      }
+      if (additional > 0) {
+        long newTotal = Math.min(allocations.get(i) + additional, request.requestSize);
+        // TODO Later - If requestedSize is used, the difference could be allocated to others.
+        allocations.set(i, newTotal);
+        LOG.debug("Adding {} to {} total={}", additional, request.componentClassname, newTotal);
+      }
+    }
   }
 
   private void initialProcessMemoryRequestContext(InitialMemoryRequestContext context) {
@@ -164,9 +214,10 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
     String className = context.getComponentClassName();
     requestType = getRequestTypeForClass(className);
     Integer typeScaleFactor = getScaleFactorForType(requestType);
+    ComponentType componentType = context.getComponentType();
 
-    Request request = new Request(context.getComponentClassName(), context.getRequestedSize(),
-        requestType, typeScaleFactor);
+    Request request = new Request(context.getComponentClassName(), componentType,
+        context.getRequestedSize(), requestType, typeScaleFactor);
     requests.add(request);
     numRequestsScaled += typeScaleFactor;
   }
@@ -293,14 +344,17 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
   }
 
   private static class Request {
-    Request(String componentClassname, long requestSize, RequestType requestType, int requestWeight)
{
+    Request(String componentClassname, ComponentType componentType, long requestSize,
+        RequestType requestType, int requestWeight) {
       this.componentClassname = componentClassname;
+      this.componentType = componentType;
       this.requestSize = requestSize;
       this.requestType = requestType;
       this.requestWeight = requestWeight;
     }
 
     String componentClassname;
+    ComponentType componentType;
     long requestSize;
     private RequestType requestType;
     private int requestWeight;

http://git-wip-us.apache.org/repos/asf/tez/blob/63ae97d5/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
index a38497c..2fbe264 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -32,6 +33,7 @@ import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
 import org.apache.tez.runtime.library.input.UnorderedKVInput;
 import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
@@ -145,6 +147,169 @@ public class TestWeightedScalingMemoryDistributor extends TestMemoryDistributor
     assertEquals(1500, e4Callback.assigned);
   }
   
+  @Test(timeout = 5000)
+  public void testWeightedScalingNonConcurrent() throws TezException {
+    Configuration conf = new Configuration(this.conf);
+    conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT, false);
+    conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED,
true);
+    conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, 0.2);
+    conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS,
+        WeightedScalingMemoryDistributor.generateWeightStrings(0, 0, 1, 2, 3, 1, 1));
+    System.err.println(Joiner.on(",").join(conf.getStringCollection(
+        TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS)));
+
+    MemoryDistributor dist = new MemoryDistributor(2, 2, conf);
+
+    dist.setJvmMemory(10000l);
+
+    // First request - ScatterGatherShuffleInput
+    MemoryUpdateCallbackForTest e1Callback = new MemoryUpdateCallbackForTest();
+    InputContext e1InputContext1 = createTestInputContext();
+    InputDescriptor e1InDesc1 = createTestInputDescriptor(OrderedGroupedKVInput.class);
+    dist.requestMemory(10000, e1Callback, e1InputContext1, e1InDesc1);
+
+    // Second request - BroadcastInput
+    MemoryUpdateCallbackForTest e2Callback = new MemoryUpdateCallbackForTest();
+    InputContext e2InputContext2 = createTestInputContext();
+    InputDescriptor e2InDesc2 = createTestInputDescriptor(UnorderedKVInput.class);
+    dist.requestMemory(10000, e2Callback, e2InputContext2, e2InDesc2);
+
+    // Third request - randomOutput (simulates MROutput)
+    MemoryUpdateCallbackForTest e3Callback = new MemoryUpdateCallbackForTest();
+    OutputContext e3OutputContext1 = createTestOutputContext();
+    OutputDescriptor e3OutDesc1 = createTestOutputDescriptor();
+    dist.requestMemory(10000, e3Callback, e3OutputContext1, e3OutDesc1);
+
+    // Fourth request - OnFileSortedOutput
+    MemoryUpdateCallbackForTest e4Callback = new MemoryUpdateCallbackForTest();
+    OutputContext e4OutputContext2 = createTestOutputContext();
+    OutputDescriptor e4OutDesc2 = createTestOutputDescriptor(OrderedPartitionedKVOutput.class);
+    dist.requestMemory(10000, e4Callback, e4OutputContext2, e4OutDesc2);
+
+    // Fifth request - Processor
+    MemoryUpdateCallbackForTest e5Callback = new MemoryUpdateCallbackForTest();
+    ProcessorContext e5ProcContext = createTestProcessortContext();
+    ProcessorDescriptor e5ProcDesc = createTestProcessorDescriptor();
+    dist.requestMemory(10000, e5Callback, e5ProcContext, e5ProcDesc);
+
+    dist.makeInitialAllocations();
+
+    // Total available: 80% of 10K = 8000
+    // 5 requests (weight) - 10K (3), 10K(1), 10K(1), 10K(2), 10K(1)
+    // Overlap input and output memory
+    assertEquals(5250, e1Callback.assigned);
+    assertEquals(1750, e2Callback.assigned);
+    assertEquals(2333, e3Callback.assigned);
+    assertEquals(4666, e4Callback.assigned);
+    assertEquals(1000, e5Callback.assigned);
+  }
+
+  @Test(timeout = 5000)
+  public void testAdditionalReserveFractionWeightedScalingNonConcurrent() throws TezException
{
+    Configuration conf = new Configuration(this.conf);
+    conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT, false);
+    conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED,
true);
+    conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS,
+        WeightedScalingMemoryDistributor.generateWeightStrings(0, 0, 2, 3, 6, 1, 1));
+    conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_PER_IO,
0.025d);
+    conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_MAX,
0.2d);
+
+    MemoryDistributor dist = new MemoryDistributor(2, 2, conf);
+
+    dist.setJvmMemory(10000l);
+
+    // First request - ScatterGatherShuffleInput [weight 6]
+    MemoryUpdateCallbackForTest e1Callback = new MemoryUpdateCallbackForTest();
+    InputContext e1InputContext1 = createTestInputContext();
+    InputDescriptor e1InDesc1 = createTestInputDescriptor(OrderedGroupedKVInput.class);
+    dist.requestMemory(10000, e1Callback, e1InputContext1, e1InDesc1);
+
+    // Second request - BroadcastInput [weight 2]
+    MemoryUpdateCallbackForTest e2Callback = new MemoryUpdateCallbackForTest();
+    InputContext e2InputContext2 = createTestInputContext();
+    InputDescriptor e2InDesc2 = createTestInputDescriptor(UnorderedKVInput.class);
+    dist.requestMemory(10000, e2Callback, e2InputContext2, e2InDesc2);
+
+    // Third request - randomOutput (simulates MROutput) [weight 1]
+    MemoryUpdateCallbackForTest e3Callback = new MemoryUpdateCallbackForTest();
+    OutputContext e3OutputContext1 = createTestOutputContext();
+    OutputDescriptor e3OutDesc1 = createTestOutputDescriptor();
+    dist.requestMemory(10000, e3Callback, e3OutputContext1, e3OutDesc1);
+
+    // Fourth request - OnFileSortedOutput [weight 3]
+    MemoryUpdateCallbackForTest e4Callback = new MemoryUpdateCallbackForTest();
+    OutputContext e4OutputContext2 = createTestOutputContext();
+    OutputDescriptor e4OutDesc2 = createTestOutputDescriptor(OrderedPartitionedKVOutput.class);
+    dist.requestMemory(10000, e4Callback, e4OutputContext2, e4OutDesc2);
+
+    dist.makeInitialAllocations();
+
+    // Total available: 60% of 10K = 6000
+    // 4 requests (weight) - 10K (6), 10K(2), 10K(1), 10K(3)
+    // Overlap input and output memory
+    assertEquals(4500, e1Callback.assigned);
+    assertEquals(1500, e2Callback.assigned);
+    assertEquals(1500, e3Callback.assigned);
+    assertEquals(4500, e4Callback.assigned);
+  }
+
+  @Test(timeout = 5000)
+  public void testWeightedScalingNonConcurrentInputsDisabled() throws TezException {
+    Configuration conf = new Configuration(this.conf);
+    conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT, false);
+    conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED,
false);
+    conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, 0.2);
+    conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS,
+        WeightedScalingMemoryDistributor.generateWeightStrings(0, 0, 1, 2, 3, 1, 1));
+    System.err.println(Joiner.on(",").join(conf.getStringCollection(
+        TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS)));
+
+    MemoryDistributor dist = new MemoryDistributor(2, 2, conf);
+
+    dist.setJvmMemory(10000l);
+
+    // First request - ScatterGatherShuffleInput
+    MemoryUpdateCallbackForTest e1Callback = new MemoryUpdateCallbackForTest();
+    InputContext e1InputContext1 = createTestInputContext();
+    InputDescriptor e1InDesc1 = createTestInputDescriptor(OrderedGroupedKVInput.class);
+    dist.requestMemory(10000, e1Callback, e1InputContext1, e1InDesc1);
+
+    // Second request - BroadcastInput
+    MemoryUpdateCallbackForTest e2Callback = new MemoryUpdateCallbackForTest();
+    InputContext e2InputContext2 = createTestInputContext();
+    InputDescriptor e2InDesc2 = createTestInputDescriptor(UnorderedKVInput.class);
+    dist.requestMemory(10000, e2Callback, e2InputContext2, e2InDesc2);
+
+    // Third request - randomOutput (simulates MROutput)
+    MemoryUpdateCallbackForTest e3Callback = new MemoryUpdateCallbackForTest();
+    OutputContext e3OutputContext1 = createTestOutputContext();
+    OutputDescriptor e3OutDesc1 = createTestOutputDescriptor();
+    dist.requestMemory(10000, e3Callback, e3OutputContext1, e3OutDesc1);
+
+    // Fourth request - OnFileSortedOutput
+    MemoryUpdateCallbackForTest e4Callback = new MemoryUpdateCallbackForTest();
+    OutputContext e4OutputContext2 = createTestOutputContext();
+    OutputDescriptor e4OutDesc2 = createTestOutputDescriptor(OrderedPartitionedKVOutput.class);
+    dist.requestMemory(10000, e4Callback, e4OutputContext2, e4OutDesc2);
+
+    // Fifth request - Processor
+    MemoryUpdateCallbackForTest e5Callback = new MemoryUpdateCallbackForTest();
+    ProcessorContext e5ProcContext = createTestProcessortContext();
+    ProcessorDescriptor e5ProcDesc = createTestProcessorDescriptor();
+    dist.requestMemory(10000, e5Callback, e5ProcContext, e5ProcDesc);
+
+    dist.makeInitialAllocations();
+
+    // Total available: 80% of 10K = 8000
+    // 5 requests (weight) - 10K (3), 10K(1), 10K(1), 10K(2), 10K(1)
+    // Overlap input and output memory
+    assertEquals(3000, e1Callback.assigned);
+    assertEquals(1000, e2Callback.assigned);
+    assertEquals(2333, e3Callback.assigned);
+    assertEquals(4666, e4Callback.assigned);
+    assertEquals(1000, e5Callback.assigned);
+  }
+
   private static class MemoryUpdateCallbackForTest extends MemoryUpdateCallback {
 
     long assigned = -1000;


Mime
View raw message