tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-782. Scale I/O mem requirements if misconfigured. (sseth)
Date Wed, 12 Feb 2014 01:24:54 GMT
Updated Branches:
  refs/heads/master 5c5622e12 -> 505898934


TEZ-782. Scale I/O mem requirements if misconfigured. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/50589893
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/50589893
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/50589893

Branch: refs/heads/master
Commit: 505898934b61a35b264a1160971988b4860e044f
Parents: 5c5622e
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Feb 11 17:24:12 2014 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Feb 11 17:24:12 2014 -0800

----------------------------------------------------------------------
 .../apache/tez/dag/api/TezConfiguration.java    |  18 +
 .../tez/runtime/api/MemoryUpdateCallback.java   |  34 ++
 .../apache/tez/runtime/api/TezTaskContext.java  |  19 +
 .../org/apache/tez/mapreduce/input/MRInput.java |   2 +
 .../apache/tez/mapreduce/output/MROutput.java   |   2 +
 tez-runtime-internals/pom.xml                   |   5 +
 .../runtime/LogicalIOProcessorRuntimeTask.java  |  16 +-
 .../runtime/api/impl/TezInputContextImpl.java   |   7 +-
 .../runtime/api/impl/TezOutputContextImpl.java  |   8 +-
 .../api/impl/TezProcessorContextImpl.java       |   7 +-
 .../runtime/api/impl/TezTaskContextImpl.java    |  23 +-
 .../common/resources/MemoryDistributor.java     | 376 +++++++++++++++++++
 .../common/resources/TestMemoryDistributor.java | 260 +++++++++++++
 .../broadcast/input/BroadcastInputManager.java  |  12 +-
 .../input/BroadcastShuffleManager.java          |   2 +-
 .../common/shuffle/impl/MergeManager.java       |  58 ++-
 .../common/sort/impl/ExternalSorter.java        |  15 +
 .../common/sort/impl/PipelinedSorter.java       |   5 +-
 .../common/sort/impl/dflt/DefaultSorter.java    |   3 +-
 .../runtime/library/input/LocalMergedInput.java |   1 +
 .../library/output/OnFileSortedOutput.java      |   3 +-
 .../library/output/OnFileUnorderedKVOutput.java |   5 +-
 .../input/TestBroadcastInputManager.java        |  10 +-
 .../output/TestOnFileUnorderedKVOutput.java     |   9 +-
 .../java/org/apache/tez/test/TestInput.java     |   1 +
 .../java/org/apache/tez/test/TestOutput.java    |   1 +
 26 files changed, 855 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 6a67e70..4355cf5 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.api;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 
 public class TezConfiguration extends Configuration {
@@ -166,6 +167,23 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT = TEZ_TASK_PREFIX
       + "max-events-per-heartbeat.max";
   public static final int TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT = 100;
+  
+  /**
+   * Whether to scale down memory requested by each component if the total
+   * exceeds the available JVM memory
+   */
+  @Unstable
+  public static final String TEZ_TASK_SCALE_MEMORY_ENABLED = TEZ_TASK_PREFIX
+      + "scale.memory.enabled";
+  public static final boolean TEZ_TASK_SCALE_MEMORY_ENABLED_DEFAULT = true;
+  
+  /**
+   * The fraction of the JVM memory which will not be considered for allocation.
+   * No defaults, since there are pre-existing defaults based on different scenarios.
+   */
+  @Unstable
+  public static final String TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION = TEZ_TASK_PREFIX
+      + "scale.memory.reserve-fraction";
 
   public static final String TASK_TIMEOUT = TEZ_TASK_PREFIX + "timeout";
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-api/src/main/java/org/apache/tez/runtime/api/MemoryUpdateCallback.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MemoryUpdateCallback.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MemoryUpdateCallback.java
new file mode 100644
index 0000000..c2c89d8
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MemoryUpdateCallback.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * This interface will be used by Tez to inform components about available
+ * memory. This will typically be implemented by {@link Input}s, {@link Output}s
+ * and potentially {@link Processor}s
+ * 
+ */
+@Unstable
+public interface MemoryUpdateCallback {
+
+  public void memoryAssigned(long assignedSize);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java
index f26316a..70ef41d 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java
@@ -134,4 +134,23 @@ public interface TezTaskContext {
    * @return a ByteBuffer representing the meta-data
    */
   public ByteBuffer getServiceProviderMetaData(String serviceName);
+  
+  /**
+   * Request a specific amount of memory during initialization
+   * (initialize(..*Context)) The requester is notified of allocation via the
+   * provided callback handler.
+   * 
+   * Currently, (post TEZ-668) the caller will be informed about the available
+   * memory after initialization (I/P/O initialize(...)), and before the
+   * start/run invocation. There will be no other invocations on the callback.
+   * 
+   * This method can be called only once by any component. Calling it multiple
+   * times from within the same component will result in an error.
+   * 
+   * @param size
+   *          request size in bytes.
+   * @param callbackHandler
+   *          the callback handler to be invoked once memory is assigned
+   */
+  public void requestInitialMemory(long size, MemoryUpdateCallback callbackHandler);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 336968c..dda57fc 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -120,6 +120,8 @@ public class MRInput implements LogicalInput {
   @Override
   public List<Event> initialize(TezInputContext inputContext) throws IOException {
     this.inputContext = inputContext;
+    // TEZ 815. Fix this. Not used until there's a separation between init and start.
+    inputContext.requestInitialMemory(0l, null);
     MRInputUserPayloadProto mrUserPayload =
       MRHelpers.parseMRInputPayload(inputContext.getUserPayload());
     Preconditions.checkArgument(mrUserPayload.hasSplits() == false,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 605efc8..e5d223d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -90,6 +90,8 @@ public class MROutput implements LogicalOutput {
   public List<Event> initialize(TezOutputContext outputContext)
       throws IOException, InterruptedException {
     LOG.info("Initializing Simple Output");
+    // TEZ 815. Fix this. Not used until there's a separation between init and start.
+    outputContext.requestInitialMemory(0l, null);
     taskNumberFormat.setMinimumIntegerDigits(5);
     taskNumberFormat.setGroupingUsed(false);
     nonTaskNumberFormat.setMinimumIntegerDigits(3);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-runtime-internals/pom.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/pom.xml b/tez-runtime-internals/pom.xml
index f00e2ac..61ac67a 100644
--- a/tez-runtime-internals/pom.xml
+++ b/tez-runtime-internals/pom.xml
@@ -57,6 +57,11 @@
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index cb6287e..62de3ae 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -68,6 +68,7 @@ import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
 import org.apache.tez.runtime.api.impl.TezProcessorContextImpl;
 import org.apache.tez.runtime.api.impl.TezUmbilical;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.common.resources.MemoryDistributor;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -96,6 +97,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private final ProcessorDescriptor processorDescriptor;
   private final LogicalIOProcessor processor;
   private TezProcessorContext processorContext;
+  
+  private final MemoryDistributor initialMemoryDistributor;
 
   /** Maps which will be provided to the processor run method */
   private final LinkedHashMap<String, LogicalInput> runInputMap;
@@ -136,7 +139,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.eventsToBeProcessed = new LinkedBlockingQueue<TezEvent>();
     this.state = State.NEW;
     this.appAttemptNumber = appAttemptNumber;
-    int numInitializers = numInputs + numOutputs + 1;
+    int numInitializers = numInputs + numOutputs; // Processor is initialized in the main thread.
     this.initializerExecutor = Executors.newFixedThreadPool(
         numInitializers,
         new ThreadFactoryBuilder().setDaemon(true)
@@ -144,6 +147,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.initializerCompletionService = new ExecutorCompletionService<Void>(
         this.initializerExecutor);
     this.groupInputSpecs = taskSpec.getGroupInputs();
+    initialMemoryDistributor = new MemoryDistributor(numInputs, numOutputs, tezConf);
   }
 
   public void initialize() throws Exception {
@@ -188,6 +192,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       }
     }
     LOG.info("All initializers finished");
+    initialMemoryDistributor.makeInitialAllocations();
 
     // group inputs depend on inputs beings initialized. So must be done after.
     initializeGroupInputs();
@@ -367,7 +372,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         inputSpec.getInputDescriptor().getUserPayload() == null ? taskSpec
             .getProcessorDescriptor().getUserPayload() : inputSpec
             .getInputDescriptor().getUserPayload(), this,
-        serviceConsumerMetadata, System.getenv());
+        serviceConsumerMetadata, System.getenv(), initialMemoryDistributor,
+        inputSpec.getInputDescriptor());
     return inputContext;
   }
 
@@ -379,7 +385,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         outputSpec.getOutputDescriptor().getUserPayload() == null ? taskSpec
             .getProcessorDescriptor().getUserPayload() : outputSpec
             .getOutputDescriptor().getUserPayload(), this,
-        serviceConsumerMetadata, System.getenv());
+        serviceConsumerMetadata, System.getenv(), initialMemoryDistributor,
+        outputSpec.getOutputDescriptor());
     return outputContext;
   }
 
@@ -387,7 +394,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     TezProcessorContext processorContext = new TezProcessorContextImpl(tezConf,
         appAttemptNumber, tezUmbilical, taskSpec.getVertexName(), taskSpec.getTaskAttemptID(),
         tezCounters, processorDescriptor.getUserPayload(), this,
-        serviceConsumerMetadata, System.getenv());
+        serviceConsumerMetadata, System.getenv(), initialMemoryDistributor,
+        processorDescriptor);
     return processorContext;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index 9849665..eeb7df0 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -26,11 +26,13 @@ import java.util.Map;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.RuntimeTask;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.common.resources.MemoryDistributor;
 
 public class TezInputContextImpl extends TezTaskContextImpl
     implements TezInputContext {
@@ -46,10 +48,11 @@ public class TezInputContextImpl extends TezTaskContextImpl
       String sourceVertexName, TezTaskAttemptID taskAttemptID,
       TezCounters counters, int inputIndex, byte[] userPayload,
       RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata,
-      Map<String, String> auxServiceEnv) {
+      Map<String, String> auxServiceEnv, MemoryDistributor memDist,
+      InputDescriptor inputDescriptor) {
     super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
         counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
-        auxServiceEnv);
+        auxServiceEnv, memDist, inputDescriptor);
     this.userPayload = userPayload;
     this.inputIndex = inputIndex;
     this.sourceVertexName = sourceVertexName;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 5233db8..6f45d39 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -26,11 +26,13 @@ import java.util.Map;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.RuntimeTask;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.common.resources.MemoryDistributor;
 
 public class TezOutputContextImpl extends TezTaskContextImpl
     implements TezOutputContext {
@@ -47,10 +49,11 @@ public class TezOutputContextImpl extends TezTaskContextImpl
       TezTaskAttemptID taskAttemptID, TezCounters counters, int outputIndex,
       byte[] userPayload, RuntimeTask runtimeTask,
       Map<String, ByteBuffer> serviceConsumerMetadata,
-      Map<String, String> auxServiceEnv) {
+      Map<String, String> auxServiceEnv, MemoryDistributor memDist,
+      OutputDescriptor outputDescriptor) {
     super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
         counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
-        auxServiceEnv);
+        auxServiceEnv, memDist, outputDescriptor);
     this.userPayload = userPayload;
     this.outputIndex = outputIndex;
     this.destinationVertexName = destinationVertexName;
@@ -87,5 +90,4 @@ public class TezOutputContextImpl extends TezTaskContextImpl
   public int getOutputIndex() {
     return outputIndex;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index e307b33..7509cea 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -26,11 +26,13 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.RuntimeTask;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.common.resources.MemoryDistributor;
 
 public class TezProcessorContextImpl extends TezTaskContextImpl
   implements TezProcessorContext {
@@ -43,10 +45,11 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
       TezTaskAttemptID taskAttemptID, TezCounters counters,
       byte[] userPayload, RuntimeTask runtimeTask,
       Map<String, ByteBuffer> serviceConsumerMetadata,
-      Map<String, String> auxServiceEnv) {
+      Map<String, String> auxServiceEnv, MemoryDistributor memDist,
+      ProcessorDescriptor processorDescriptor) {
     super(conf, appAttemptNumber, vertexName, taskAttemptID,
         counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
-        auxServiceEnv);
+        auxServiceEnv, memDist, processorDescriptor);
     this.userPayload = userPayload;
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
         taskVertexName, "", taskAttemptID);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index 158c080..c31e903 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -30,9 +30,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezEntityDescriptor;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.RuntimeTask;
+import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.TezTaskContext;
+import org.apache.tez.runtime.common.resources.MemoryDistributor;
 
 public abstract class TezTaskContextImpl implements TezTaskContext {
 
@@ -49,13 +52,16 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
   private final Map<String, ByteBuffer> serviceConsumerMetadata;
   private final int appAttemptNumber;
   private final Map<String, String> auxServiceEnv;
+  protected final MemoryDistributor initialMemoryDistributor;
+  protected final TezEntityDescriptor descriptor;
 
   @Private
   public TezTaskContextImpl(Configuration conf, int appAttemptNumber,
       String taskVertexName, TezTaskAttemptID taskAttemptID,
       TezCounters counters, RuntimeTask runtimeTask,
       TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata,
-      Map<String, String> auxServiceEnv) {
+      Map<String, String> auxServiceEnv, MemoryDistributor memDist,
+      TezEntityDescriptor descriptor) {
     this.conf = conf;
     this.taskVertexName = taskVertexName;
     this.taskAttemptID = taskAttemptID;
@@ -71,6 +77,8 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
     this.auxServiceEnv = auxServiceEnv;
     this.uniqueIdentifier = String.format("%s_%05d", taskAttemptID.toString(),
         generateId());
+    this.initialMemoryDistributor = memDist;
+    this.descriptor = descriptor;
   }
 
   @Override
@@ -138,6 +146,17 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
         serviceName, auxServiceEnv);
   }
 
+  @Override
+  public void requestInitialMemory(long size, MemoryUpdateCallback callbackHandler) {
+    this.initialMemoryDistributor.requestMemory(size, new MemoryUpdateCallback() {
+      @Override
+      public void memoryAssigned(long assignedSize) {
+        // Filler for this patch.
+        // TODO TEZ-815 implement this properly.
+      }
+    }, this, descriptor);
+  }
+
   protected void signalFatalError(Throwable t, String message,
       EventMetaData sourceInfo) {
     runtimeTask.setFatalError(t, message);
@@ -154,7 +173,7 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
     }
     tezUmbilical.signalFatalError(taskAttemptID, diagnostics, sourceInfo);
   }
-  
+
   private int generateId() {
     return ID_GEN.incrementAndGet();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
new file mode 100644
index 0000000..df71447
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.common.resources;
+
+import java.text.DecimalFormat;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezEntityDescriptor;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.MemoryUpdateCallback;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.TezTaskContext;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+// Not calling this a MemoryManager explicitly. Not yet anyway.
+@Private
+public class MemoryDistributor {
+
+  private static final Log LOG = LogFactory.getLog(MemoryDistributor.class);
+
+  private final int numTotalInputs;
+  private final int numTotalOutputs;
+  
+  private int numInputsSeen = 0;
+  private int numOutputsSeen = 0;
+
+  private long totalJvmMemory;
+  private long totalAssignableMemory;
+  private final boolean isEnabled;
+  private final boolean reserveFractionConfigured;
+  private float reserveFraction;
+  private final Set<TezTaskContext> dupSet = Sets.newConcurrentHashSet();
+  private final List<RequestorInfo> requestList;
+  
+  // Maybe make the reserve fraction configurable. Or scale it based on JVM heap.
+  @VisibleForTesting
+  static final float RESERVE_FRACTION_NO_PROCESSOR = 0.3f;
+  @VisibleForTesting
+  static final float RESERVE_FRACTION_WITH_PROCESSOR = 0.05f;
+
+  /**
+   * @param numInputs
+   *          total number of Inputs for the task
+   * @param numOutputs
+   *          total number of Outputs for the task
+   * @param conf
+   *          Tez specific task configuration
+   */
+  public MemoryDistributor(int numTotalInputs, int numTotalOutputs, Configuration conf) {
+    isEnabled = conf.getBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED,
+        TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED_DEFAULT);
+    if (conf.get(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION) != null) {
+      reserveFractionConfigured = true;
+      reserveFraction = conf.getFloat(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION,
+          RESERVE_FRACTION_NO_PROCESSOR);
+      Preconditions.checkArgument(reserveFraction >= 0.0f && reserveFraction <= 1.0f);
+    } else {
+      reserveFractionConfigured = false;
+      reserveFraction = RESERVE_FRACTION_NO_PROCESSOR;
+    }
+
+    this.numTotalInputs = numTotalInputs;
+    this.numTotalOutputs = numTotalOutputs;
+    this.totalJvmMemory = Runtime.getRuntime().maxMemory();
+    computeAssignableMemory();
+    this.requestList = Collections.synchronizedList(new LinkedList<RequestorInfo>());
+    LOG.info("InitialMemoryDistributor (isEnabled=" + isEnabled + ") invoked with: numInputs="
+        + numTotalInputs + ", numOutputs=" + numTotalOutputs
+        + ". Configuration: reserveFractionSpecified= " + reserveFractionConfigured
+        + ", reserveFraction=" + reserveFraction + ", JVM.maxFree=" + totalJvmMemory
+        + ", assignableMemory=" + totalAssignableMemory);
+  }
+
+
+  
+  /**
+   * Used by the Tez framework to request memory on behalf of user requests.
+   */
+  public void requestMemory(long requestSize, MemoryUpdateCallback callback,
+      TezTaskContext taskContext, TezEntityDescriptor descriptor) {
+    registerRequest(requestSize, callback, taskContext, descriptor);
+  }
+  
+  /**
+   * Used by the Tez framework to distribute initial memory after components
+   * have made their initial requests.
+   */
+  public void makeInitialAllocations() {
+    Preconditions.checkState(numInputsSeen == numTotalInputs, "All inputs are expected to ask for memory");
+    Preconditions.checkState(numOutputsSeen == numTotalOutputs, "All outputs are expected to ask for memory");
+    Iterable<RequestContext> requestContexts = Iterables.transform(requestList,
+        new Function<RequestorInfo, RequestContext>() {
+          public RequestContext apply(RequestorInfo requestInfo) {
+            return requestInfo.getRequestContext();
+          }
+        });
+
+    Iterable<Long> allocations = null;
+    if (!isEnabled) {
+      allocations = Iterables.transform(requestList, new Function<RequestorInfo, Long>() {
+        public Long apply(RequestorInfo requestInfo) {
+          return requestInfo.getRequestContext().getRequestedSize();
+        }
+      });
+    } else {
+      InitialMemoryAllocator allocator = new ScalingAllocator();
+      allocations = allocator.assignMemory(totalAssignableMemory, numTotalInputs, numTotalOutputs,
+          Iterables.unmodifiableIterable(requestContexts));
+      validateAllocations(allocations, requestList.size());
+    }
+
+    // Making the callbacks directly for now, instead of spawning threads. The
+    // callback implementors - all controlled by Tez at the moment are
+    // lightweight.
+    Iterator<Long> allocatedIter = allocations.iterator();
+    for (RequestorInfo rInfo : requestList) {
+      long allocated = allocatedIter.next();
+      LOG.info("Informing: " + rInfo.getRequestContext().getComponentType() + ", "
+          + rInfo.getRequestContext().getComponentVertexName() + ", "
+          + rInfo.getRequestContext().getComponentClassName() + ": requested="
+          + rInfo.getRequestContext().getRequestedSize() + ", allocated=" + allocated);
+      rInfo.getCallback().memoryAssigned(allocated);
+    }
+  }
+
+  /**
+   * Allow tests to set memory.
+   * @param size
+   */
+  @Private
+  @VisibleForTesting
+  void setJvmMemory(long size) {
+    this.totalJvmMemory = size;
+    computeAssignableMemory();
+  }
+  
+  private void computeAssignableMemory() {
+    this.totalAssignableMemory = totalJvmMemory - ((long) (reserveFraction * totalJvmMemory));
+  }
+
+  private long registerRequest(long requestSize, MemoryUpdateCallback callback,
+      TezTaskContext entityContext, TezEntityDescriptor descriptor) {
+    Preconditions.checkArgument(requestSize >= 0);
+    Preconditions.checkNotNull(callback);
+    Preconditions.checkNotNull(entityContext);
+    Preconditions.checkNotNull(descriptor);
+    if (!dupSet.add(entityContext)) {
+      throw new TezUncheckedException(
+          "A single entity can only make one call to request resources for now");
+    }
+
+    RequestorInfo requestInfo = new RequestorInfo(entityContext,requestSize, callback, descriptor);
+    switch (requestInfo.getRequestContext().getComponentType()) {
+    case INPUT:
+      numInputsSeen++;
+      Preconditions.checkState(numInputsSeen <= numTotalInputs,
+          "Num Requesting Inputs higher than total # of inputs: " + numInputsSeen + ", "
+              + numTotalInputs);
+      break;
+    case OUTPUT:
+      numOutputsSeen++;
+      Preconditions.checkState(numOutputsSeen <= numTotalOutputs,
+          "Num Requesting Inputs higher than total # of outputs: " + numOutputsSeen + ", "
+              + numTotalOutputs);
+    case PROCESSOR:
+      break;
+    default:
+      break;
+    }
+    requestList.add(requestInfo);
+    if (!reserveFractionConfigured
+        && requestInfo.getRequestContext().getComponentType() == RequestContext.ComponentType.PROCESSOR) {
+      reserveFraction = RESERVE_FRACTION_WITH_PROCESSOR;
+      computeAssignableMemory();
+      LOG.info("Processor request for initial memory. Updating assignableMemory to : "
+          + totalAssignableMemory);
+    }
+    return -1;
+  }
+
+  private void validateAllocations(Iterable<Long> allocations, int numRequestors) {
+    Preconditions.checkNotNull(allocations);
+    long totalAllocated = 0l;
+    int numAllocations = 0;
+    for (Long l : allocations) {
+      totalAllocated += l;
+      numAllocations++;
+    }
+    Preconditions.checkState(numAllocations == numRequestors,
+        "Number of allocations must match number of requestors. Allocated=" + numAllocations
+            + ", Requests: " + numRequestors);
+    Preconditions.checkState(totalAllocated <= totalAssignableMemory,
+        "Total allocation should be <= availableMem. TotalAllocated: " + totalAllocated
+            + ", totalAssignable: " + totalAssignableMemory);
+  }
+
+  /**
+   * Used to balance memory requests before a task starts executing.
+   */
+  private static interface InitialMemoryAllocator {
+    /**
+     * @param availableForAllocation
+     *          memory available for allocation
+     * @param numTotalInputs
+     *          number of inputs for the task
+     * @param numTotalOutputs
+     *          number of outputs for the tasks
+     * @param requests
+     *          Iterable view of requests received
+     * @return list of allocations, one per request. This must be ordered in the
+     *         same order of the requests.
+     */
+    Iterable<Long> assignMemory(long availableForAllocation, int numTotalInputs,
+        int numTotalOutputs, Iterable<RequestContext> requests);
+  }
+
+  // Make this a public class if pulling the interface out.
+  // Custom allocator based on The classes being used. Broadcast typically needs
+  // a lot less than sort etc.
+  private static class RequestContext {
+
+    private static enum ComponentType {
+      INPUT, OUTPUT, PROCESSOR
+    }
+
+    private long requestedSize;
+    private String componentClassName;
+    private ComponentType componentType;
+    private String componentVertexName;
+
+    public RequestContext(long requestedSize, String componentClassName,
+        ComponentType componentType, String componentVertexName) {
+      this.requestedSize = requestedSize;
+      this.componentClassName = componentClassName;
+      this.componentType = componentType;
+      this.componentVertexName = componentVertexName;
+    }
+
+    public long getRequestedSize() {
+      return requestedSize;
+    }
+
+    public String getComponentClassName() {
+      return componentClassName;
+    }
+
+    public ComponentType getComponentType() {
+      return componentType;
+    }
+
+    public String getComponentVertexName() {
+      return componentVertexName;
+    }
+  }
+
+  @Private
+  private static class RequestorInfo {
+    private final MemoryUpdateCallback callback;
+    private final RequestContext requestContext;
+
+    RequestorInfo(TezTaskContext taskContext, long requestSize,
+        final MemoryUpdateCallback callback, TezEntityDescriptor descriptor) {
+      RequestContext.ComponentType type;
+      String componentVertexName;
+      if (taskContext instanceof TezInputContext) {
+        type = RequestContext.ComponentType.INPUT;
+        componentVertexName = ((TezInputContext) taskContext).getSourceVertexName();
+      } else if (taskContext instanceof TezOutputContext) {
+        type = RequestContext.ComponentType.OUTPUT;
+        componentVertexName = ((TezOutputContext) taskContext).getDestinationVertexName();
+      } else if (taskContext instanceof TezProcessorContext) {
+        type = RequestContext.ComponentType.PROCESSOR;
+        componentVertexName = ((TezProcessorContext) taskContext).getTaskVertexName();
+      } else {
+        throw new IllegalArgumentException("Unknown type of entityContext: "
+            + taskContext.getClass().getName());
+      }
+      this.requestContext = new RequestContext(requestSize, descriptor.getClassName(), type,
+          componentVertexName);
+      this.callback = callback;
+      LOG.info("Received request: " + requestSize + ", type: " + type
+          + ", componentVertexName: " + componentVertexName);
+    }
+
+    public MemoryUpdateCallback getCallback() {
+      return callback;
+    }
+
+    public RequestContext getRequestContext() {
+      return requestContext;
+    }
+  }
+
+  private static class ScalingAllocator implements InitialMemoryAllocator {
+
+    @Override
+    public Iterable<Long> assignMemory(long availableForAllocation, int numTotalInputs,
+        int numTotalOutputs, Iterable<RequestContext> requests) {
+      int numRequests = 0;
+      long totalRequested = 0;
+      for (RequestContext context : requests) {
+        totalRequested += context.getRequestedSize();
+        numRequests++;
+      }
+
+      long totalJvmMem = Runtime.getRuntime().maxMemory();
+      double ratio = totalRequested / (double) totalJvmMem;
+      LOG.info("Scaling Requests. TotalRequested: " + totalRequested + ", TotalJVMMem: "
+          + totalJvmMem + ", TotalAvailable: " + availableForAllocation
+          + ", TotalRequested/TotalHeap:" + new DecimalFormat("0.00").format(ratio));
+
+      if (totalRequested < availableForAllocation || totalRequested == 0) {
+        // Not scaling up requests. Assuming things were setup correctly by
+        // users in this case, keeping Processor, caching etc in mind.
+        return Lists.newArrayList(Iterables.transform(requests,
+            new Function<RequestContext, Long>() {
+              public Long apply(RequestContext requestContext) {
+                return requestContext.getRequestedSize();
+              }
+            }));
+      }
+
+      List<Long> allocations = Lists.newArrayListWithCapacity(numRequests);
+      for (RequestContext request : requests) {
+        long requestedSize = request.getRequestedSize();
+        if (requestedSize == 0) {
+          allocations.add(0l);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Scaling requested: 0 to allocated: 0");
+          }
+        } else {
+          long allocated = (long) ((requestedSize / (double) totalRequested) * availableForAllocation);
+          allocations.add(allocated);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Scaling requested: " + requestedSize + " to allocated: " + allocated);  
+          }
+          
+        }
+      }
+      return allocations;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/resources/TestMemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/resources/TestMemoryDistributor.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/resources/TestMemoryDistributor.java
new file mode 100644
index 0000000..f5bc8d6
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/resources/TestMemoryDistributor.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.common.resources;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.api.MemoryUpdateCallback;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.junit.Test;
+
+public class TestMemoryDistributor {
+
+
+  @Test(timeout = 5000)
+  public void testScalingNoProcessor() {
+    Configuration conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED, true);
+    MemoryDistributor dist = new MemoryDistributor(2, 1, conf);
+    
+    dist.setJvmMemory(10000l);
+
+    // First request
+    MemoryUpdateCallbackForTest e1Callback = new MemoryUpdateCallbackForTest();
+    TezInputContext e1InputContext1 = createTestInputContext();
+    InputDescriptor e1InDesc1 = createTestInputDescriptor();
+    dist.requestMemory(10000, e1Callback, e1InputContext1, e1InDesc1);
+    
+    // Second request
+    MemoryUpdateCallbackForTest e2Callback = new MemoryUpdateCallbackForTest();
+    TezInputContext e2InputContext2 = createTestInputContext();
+    InputDescriptor e2InDesc2 = createTestInputDescriptor();
+    dist.requestMemory(10000, e2Callback, e2InputContext2, e2InDesc2);
+    
+    // Third request - output
+    MemoryUpdateCallbackForTest e3Callback = new MemoryUpdateCallbackForTest();
+    TezOutputContext e3OutputContext1 = createTestOutputContext();
+    OutputDescriptor e3OutDesc2 = createTestOutputDescriptor();
+    dist.requestMemory(5000, e3Callback, e3OutputContext1, e3OutDesc2);
+    
+    dist.makeInitialAllocations();
+    
+    // Total available: 70% of 10K = 7000
+    // 3 requests - 10K, 10K, 5K
+    // Scale down to - 2800, 2800, 1400
+    assertEquals(2800, e1Callback.assigned);
+    assertEquals(2800, e2Callback.assigned);
+    assertEquals(1400, e3Callback.assigned);
+  }
+  
+  @Test(timeout = 5000)
+  public void testScalingNoProcessor2() {
+    // Real world values
+    Configuration conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED, true);
+    MemoryDistributor dist = new MemoryDistributor(2, 0, conf);
+    
+    dist.setJvmMemory(207093760l);
+
+    // First request
+    MemoryUpdateCallbackForTest e1Callback = new MemoryUpdateCallbackForTest();
+    TezInputContext e1InputContext1 = createTestInputContext();
+    InputDescriptor e1InDesc1 = createTestInputDescriptor();
+    dist.requestMemory(104857600l, e1Callback, e1InputContext1, e1InDesc1);
+    
+    // Second request
+    MemoryUpdateCallbackForTest e2Callback = new MemoryUpdateCallbackForTest();
+    TezInputContext e2InputContext2 = createTestInputContext();
+    InputDescriptor e2InDesc2 = createTestInputDescriptor();
+    dist.requestMemory(144965632l, e2Callback, e2InputContext2, e2InDesc2);
+    
+    dist.makeInitialAllocations();
+
+    assertEquals(60846013, e1Callback.assigned);
+    assertEquals(84119614, e2Callback.assigned);
+  }
+  
+  @Test(timeout = 5000)
+  public void testScalingProcessor() {
+    Configuration conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED, true);
+    MemoryDistributor dist = new MemoryDistributor(2, 1, conf);
+    
+    dist.setJvmMemory(10000l);
+
+    // First request
+    MemoryUpdateCallbackForTest e1Callback = new MemoryUpdateCallbackForTest();
+    TezInputContext e1InputContext1 = createTestInputContext();
+    InputDescriptor e1InDesc1 = createTestInputDescriptor();
+    dist.requestMemory(10000, e1Callback, e1InputContext1, e1InDesc1);
+    
+    // Second request
+    MemoryUpdateCallbackForTest e2Callback = new MemoryUpdateCallbackForTest();
+    TezInputContext e2InputContext2 = createTestInputContext();
+    InputDescriptor e2InDesc2 = createTestInputDescriptor();
+    dist.requestMemory(10000, e2Callback, e2InputContext2, e2InDesc2);
+    
+    // Third request - output
+    MemoryUpdateCallbackForTest e3Callback = new MemoryUpdateCallbackForTest();
+    TezOutputContext e3OutputContext1 = createTestOutputContext();
+    OutputDescriptor e3OutDesc1 = createTestOutputDescriptor();
+    dist.requestMemory(5000, e3Callback, e3OutputContext1, e3OutDesc1);
+    
+    // Fourth request - processor
+    MemoryUpdateCallbackForTest e4Callback = new MemoryUpdateCallbackForTest();
+    TezProcessorContext e4ProcessorContext1 = createTestProcessortContext();
+    ProcessorDescriptor e4ProcessorDesc1 = createTestProcessorDescriptor();
+    dist.requestMemory(5000, e4Callback, e4ProcessorContext1, e4ProcessorDesc1);
+    
+    
+    dist.makeInitialAllocations();
+    
+    // Total available: 95% of 10K = 9500
+    // 4 requests - 10K, 10K, 5K, 5K
+    // Scale down to - 3166.66, 3166.66, 1583.33, 1583.33
+    assertTrue(e1Callback.assigned >= 3166 && e1Callback.assigned <= 3177);
+    assertTrue(e2Callback.assigned >= 3166 && e2Callback.assigned <= 3177);
+    assertTrue(e3Callback.assigned >= 1583 && e3Callback.assigned <= 1583);
+    assertTrue(e4Callback.assigned >= 1583 && e4Callback.assigned <= 1583);
+  }
+  
+  @Test(timeout = 5000)
+  public void testScalingDisabled() {
+    // Real world values
+    Configuration conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED, false);
+    MemoryDistributor dist = new MemoryDistributor(2, 0, conf);
+    
+    dist.setJvmMemory(207093760l);
+
+    // First request
+    MemoryUpdateCallbackForTest e1Callback = new MemoryUpdateCallbackForTest();
+    TezInputContext e1InputContext1 = createTestInputContext();
+    InputDescriptor e1InDesc1 = createTestInputDescriptor();
+    dist.requestMemory(104857600l, e1Callback, e1InputContext1, e1InDesc1);
+    
+    // Second request
+    MemoryUpdateCallbackForTest e2Callback = new MemoryUpdateCallbackForTest();
+    TezInputContext e2InputContext2 = createTestInputContext();
+    InputDescriptor e2InDesc2 = createTestInputDescriptor();
+    dist.requestMemory(144965632l, e2Callback, e2InputContext2, e2InDesc2);
+    
+    dist.makeInitialAllocations();
+
+    assertEquals(104857600l, e1Callback.assigned);
+    assertEquals(144965632l, e2Callback.assigned);
+  }
+  
+  @Test(timeout = 5000)
+  public void testReserveFractionConfigured() {
+    Configuration conf = new Configuration();
+    conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED, true);
+    conf.setFloat(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, 0.5f);
+    MemoryDistributor dist = new MemoryDistributor(2, 1, conf);
+    
+    dist.setJvmMemory(10000l);
+
+    // First request
+    MemoryUpdateCallbackForTest e1Callback = new MemoryUpdateCallbackForTest();
+    TezInputContext e1InputContext1 = createTestInputContext();
+    InputDescriptor e1InDesc1 = createTestInputDescriptor();
+    dist.requestMemory(10000, e1Callback, e1InputContext1, e1InDesc1);
+    
+    // Second request
+    MemoryUpdateCallbackForTest e2Callback = new MemoryUpdateCallbackForTest();
+    TezInputContext e2InputContext2 = createTestInputContext();
+    InputDescriptor e2InDesc2 = createTestInputDescriptor();
+    dist.requestMemory(10000, e2Callback, e2InputContext2, e2InDesc2);
+    
+    // Third request - output
+    MemoryUpdateCallbackForTest e3Callback = new MemoryUpdateCallbackForTest();
+    TezOutputContext e3OutputContext1 = createTestOutputContext();
+    OutputDescriptor e3OutDesc2 = createTestOutputDescriptor();
+    dist.requestMemory(5000, e3Callback, e3OutputContext1, e3OutDesc2);
+    
+    dist.makeInitialAllocations();
+    
+    // Total available: 50% of 10K = 7000
+    // 3 requests - 10K, 10K, 5K
+    // Scale down to - 2000, 2000, 1000
+    assertEquals(2000, e1Callback.assigned);
+    assertEquals(2000, e2Callback.assigned);
+    assertEquals(1000, e3Callback.assigned);
+  }
+  
+  
+  private static class MemoryUpdateCallbackForTest implements MemoryUpdateCallback {
+
+    long assigned = -1000;
+
+    @Override
+    public void memoryAssigned(long assignedSize) {
+      this.assigned = assignedSize;
+    }
+  }
+
+  private InputDescriptor createTestInputDescriptor() {
+    InputDescriptor desc = mock(InputDescriptor.class);
+    doReturn("InputClass").when(desc).getClassName();
+    return desc;
+  }
+
+  private OutputDescriptor createTestOutputDescriptor() {
+    OutputDescriptor desc = mock(OutputDescriptor.class);
+    doReturn("OutputClass").when(desc).getClassName();
+    return desc;
+  }
+
+  private ProcessorDescriptor createTestProcessorDescriptor() {
+    ProcessorDescriptor desc = mock(ProcessorDescriptor.class);
+    doReturn("ProcessorClass").when(desc).getClassName();
+    return desc;
+  }
+
+  private TezInputContext createTestInputContext() {
+    TezInputContext context = mock(TezInputContext.class);
+    doReturn("input").when(context).getSourceVertexName();
+    doReturn("task").when(context).getTaskVertexName();
+    return context;
+  }
+  
+  private TezOutputContext createTestOutputContext() {
+    TezOutputContext context = mock(TezOutputContext.class);
+    doReturn("output").when(context).getDestinationVertexName();
+    doReturn("task").when(context).getTaskVertexName();
+    return context;
+  }
+  
+  private TezProcessorContext createTestProcessortContext() {
+    TezProcessorContext context = mock(TezProcessorContext.class);
+    doReturn("task").when(context).getTaskVertexName();
+    return context;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
index 456b8b6..6c5a3da 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastInputManager.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
@@ -51,7 +52,7 @@ public class BroadcastInputManager implements FetchedInputAllocator,
 
   private volatile long usedMemory = 0;
 
-  public BroadcastInputManager(String uniqueIdentifier, Configuration conf) {
+  public BroadcastInputManager(String uniqueIdentifier, Configuration conf, TezInputContext inputContext) {
     this.conf = conf;
 
     this.fileNameAllocator = new TezTaskOutputFiles(conf,
@@ -69,8 +70,15 @@ public class BroadcastInputManager implements FetchedInputAllocator,
     }
 
     // Allow unit tests to fix Runtime memory
-    this.memoryLimit = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
+    long memReq = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
         Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) * maxInMemCopyUse);
+    
+    // TEZ 815. Fix this. Not used until there's a separation between init and start.
+    inputContext.requestInitialMemory(memReq, null);
+    long memAlloc = memReq;
+    
+    this.memoryLimit = memAlloc;
+    LOG.info("RequestedMem=" + memReq + ", Allocated: " + this.memoryLimit);
 
     final float singleShuffleMemoryLimitPercent = conf.getFloat(
         TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
index 47eaaf6..e4c31cf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleManager.java
@@ -152,7 +152,7 @@ public class BroadcastShuffleManager implements FetcherCallback {
     this.ifileBufferSize = conf.getInt("io.file.buffer.size",
         TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
     
-    this.inputManager = new BroadcastInputManager(inputContext.getUniqueIdentifier(), conf);
+    this.inputManager = new BroadcastInputManager(inputContext.getUniqueIdentifier(), conf, inputContext);
     this.inputEventHandler = new BroadcastShuffleInputEventHandler(
         inputContext, this, this.inputManager, codec, ifileReadAhead,
         ifileReadAheadLength);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
index dfee47b..37808c3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MergeManager.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.Constants;
@@ -113,6 +114,8 @@ public class MergeManager {
   private final int ifileReadAheadLength;
   private final int ifileBufferSize;
 
+  private final int postMergeMemLimit;
+  
   public MergeManager(Configuration conf, 
                       FileSystem localFS,
                       LocalDirAllocator localDirAllocator,  
@@ -168,11 +171,41 @@ public class MergeManager {
     }
 
     // Allow unit tests to fix Runtime memory
-    this.memoryLimit = 
-      (long)(conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
-          Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
-        * maxInMemCopyUse);
- 
+    long memLimit = (long) (conf.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY,
+        Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) * maxInMemCopyUse);
+    
+    LOG.info("Shuffle Memory Required: " + memLimit + ", based on INPUT_BUFFER_factor: " + maxInMemCopyUse);
+
+    float maxRedPer = conf.getFloat(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT,
+        TezJobConfig.DEFAULT_TEZ_RUNTIME_INPUT_BUFFER_PERCENT);
+    if (maxRedPer > 1.0 || maxRedPer < 0.0) {
+      throw new TezUncheckedException(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT + maxRedPer);
+    }
+    // TODO maxRedBuffer should be a long.
+    int maxRedBuffer = (int) Math.min(Runtime.getRuntime().maxMemory() * maxRedPer,
+        Integer.MAX_VALUE);
+    LOG.info("Memory required for final merged output: " + maxRedBuffer + ", using factor: " + maxRedPer);
+
+    long reqMem = Math.max(maxRedBuffer, memLimit);
+    // TEZ 815. Fix this. Not used until there's a separation between init and start.
+    inputContext.requestInitialMemory(reqMem, null);
+    long availableMem = reqMem;
+    
+    if (availableMem < memLimit) {
+      this.memoryLimit = availableMem;
+    } else {
+      this.memoryLimit = memLimit;
+    }
+    
+    if (availableMem < maxRedBuffer) {
+      this.postMergeMemLimit = (int) availableMem;
+    } else {
+      this.postMergeMemLimit = maxRedBuffer;
+    }
+    
+    LOG.info("FinalMemoryAllocation: ShuffleMemory=" + this.memoryLimit + ", postMergeMem: "
+        + this.postMergeMemLimit);
+
     this.ioSortFactor = 
         conf.getInt(
             TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR, 
@@ -663,18 +696,7 @@ public class MergeManager {
              inMemoryMapOutputs.size() + " in-memory map-outputs and " + 
              onDiskMapOutputs.size() + " on-disk map-outputs");
     
-    final float maxRedPer =
-      job.getFloat(
-          TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT,
-          TezJobConfig.DEFAULT_TEZ_RUNTIME_INPUT_BUFFER_PERCENT);
-    if (maxRedPer > 1.0 || maxRedPer < 0.0) {
-      throw new IOException(TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT +
-                            maxRedPer);
-    }
-    int maxInMemReduce = (int)Math.min(
-        Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
-    LOG.info("Memory allocated for final merge output: " + maxInMemReduce + ", using factor: "
-        + maxRedPer);
+    
     
 
     // merge config params
@@ -692,7 +714,7 @@ public class MergeManager {
       int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getSrcTaskIndex();
       inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, 
                                                 memDiskSegments,
-                                                maxInMemReduce);
+                                                this.postMergeMemLimit);
       final int numMemDiskSegments = memDiskSegments.size();
       if (numMemDiskSegments > 0 &&
             ioSortFactor > onDiskMapOutputs.size()) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index e7519c9..e8a78fa 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -81,6 +81,8 @@ public abstract class ExternalSorter {
   protected boolean ifileReadAhead;
   protected int ifileReadAheadLength;
   protected int ifileBufferSize;
+  
+  protected int availableMemory;
 
   protected IndexedSorter sorter;
 
@@ -101,6 +103,19 @@ public abstract class ExternalSorter {
 
     rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw();
 
+    int reqMemory = 
+        this.conf.getInt(
+            TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, 
+            TezJobConfig.DEFAULT_TEZ_RUNTIME_IO_SORT_MB);
+    long reqBytes = reqMemory << 20;
+    // TEZ 815. Fix this. Not used until there's a separation between init and start.
+    outputContext.requestInitialMemory(reqBytes, null);
+    long availBytes = reqBytes;
+    
+    this.availableMemory = (int) (availBytes >> 20);
+    
+    LOG.info("io.sort.mb requested: " + reqMemory + ", Allocated: " + availableMemory);
+    
     // sorter
     sorter = ReflectionUtils.newInstance(this.conf.getClass(
         TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS, QuickSort.class,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index f556e22..3237522 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -102,10 +102,7 @@ public class PipelinedSorter extends ExternalSorter {
       this.conf.getFloat(
           TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT, 
           TezJobConfig.DEFAULT_TEZ_RUNTIME_SORT_SPILL_PERCENT);
-    final int sortmb = 
-        this.conf.getInt(
-            TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, 
-            TezJobConfig.DEFAULT_TEZ_RUNTIME_IO_SORT_MB);
+    final int sortmb = this.availableMemory;
     indexCacheMemoryLimit = this.conf.getInt(TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES,
                                        TezJobConfig.DEFAULT_TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
     if (spillper > (float)1.0 || spillper <= (float)0.0) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 778fd3d..e831dec 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -118,8 +118,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     final float spillper = this.conf.getFloat(
         TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT,
         TezJobConfig.DEFAULT_TEZ_RUNTIME_SORT_SPILL_PERCENT);
-    final int sortmb = this.conf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB,
-        TezJobConfig.DEFAULT_TEZ_RUNTIME_IO_SORT_MB);
+    final int sortmb = this.availableMemory;
     if (spillper > (float) 1.0 || spillper <= (float) 0.0) {
       throw new IOException("Invalid \""
           + TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT + "\": " + spillper);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
index 683578e..b8191db 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
@@ -36,6 +36,7 @@ public class LocalMergedInput extends ShuffledMergedInputLegacy {
   @Override
   public List<Event> initialize(TezInputContext inputContext) throws IOException {
     this.inputContext = inputContext;
+    this.inputContext.requestInitialMemory(0l, null); // mandatory call. Fix in TEZ-815
     this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
 
     if (numInputs == 0) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 05d37ed..3b69e09 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -47,7 +47,7 @@ import com.google.common.collect.Lists;
  * written to it and persists it to a file.
  */
 public class OnFileSortedOutput implements LogicalOutput {
-  
+
   protected ExternalSorter sorter;
   protected Configuration conf;
   protected int numOutputs;
@@ -56,6 +56,7 @@ public class OnFileSortedOutput implements LogicalOutput {
   private long endTime;
   
   
+    
   @Override
   public List<Event> initialize(TezOutputContext outputContext)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index 5c49da3..4007816 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -68,7 +68,10 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
         .getUserPayload());
     this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
         outputContext.getWorkDirs());
-    
+
+    // TEZ 815. Fix this. Not used until there's a separation between init and start.
+    this.outputContext.requestInitialMemory(0l, null);
+
     this.dataViaEventsEnabled = conf.getBoolean(
         TezJobConfig.TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_ENABLED,
         TezJobConfig.TEZ_RUNTIME_BROADCAST_DATA_VIA_EVENTS_ENABLED_DEFAULT);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
index 2d663a6..b502b1d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/broadcast/input/TestBroadcastInputManager.java
@@ -18,7 +18,9 @@
 
 package org.apache.tez.runtime.library.broadcast.input;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
 import java.io.IOException;
 import java.util.UUID;
 
@@ -26,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
 import org.junit.Test;
@@ -50,7 +53,10 @@ public class TestBroadcastInputManager {
     long inMemThreshold = (long) (bufferPercent * jvmMax);
     LOG.info("InMemThreshold: " + inMemThreshold);
     
-    BroadcastInputManager inputManager = new BroadcastInputManager(UUID.randomUUID().toString(), conf);
+    TezInputContext mockInputContext = mock(TezInputContext.class);
+    BroadcastInputManager inputManager = new BroadcastInputManager(UUID.randomUUID().toString(),
+        conf, mockInputContext);
+    
     
     long requestSize = (long) (0.4f * inMemThreshold);
     long compressedSize = 1l;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index b68be2d..4f48ad2 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.runtime.library.output;
 
+import static org.mockito.Mockito.mock;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -48,6 +50,7 @@ import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
 import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.common.resources.MemoryDistributor;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
@@ -96,7 +99,7 @@ public class TestOnFileUnorderedKVOutput {
     conf.setStrings(TezJobConfig.LOCAL_DIRS, workDir.toString());
 
     int appAttemptNumber = 1;
-    TezUmbilical tezUmbilical = null; // ZZZ TestUmbilical from mapreduce
+    TezUmbilical tezUmbilical = null;
     String taskVertexName = "currentVertex";
     String destinationVertexName = "destinationVertex";
     TezDAGID dagID = TezDAGID.getInstance("2000", 1, 1);
@@ -114,11 +117,11 @@ public class TestOnFileUnorderedKVOutput {
     bb.position(0);
     AuxiliaryServiceHelper.setServiceDataIntoEnv(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, bb, auxEnv);
 
-    
+
     TezOutputContext outputContext = new TezOutputContextImpl(conf,
         appAttemptNumber, tezUmbilical, taskVertexName, destinationVertexName,
         taskAttemptID, counters, 0, userPayload, runtimeTask,
-        null, auxEnv);
+        null, auxEnv, new MemoryDistributor(1, 1, conf) , mock(OutputDescriptor.class));
 
     List<Event> events = null;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index 8da0c96..b24db78 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -188,6 +188,7 @@ public class TestInput implements LogicalInput {
   @Override
   public List<Event> initialize(TezInputContext inputContext) throws Exception {
     this.inputContext = inputContext;
+    this.inputContext.requestInitialMemory(0l, null); //Mandatory call. Fix null in TEZ-815.
     if (inputContext.getUserPayload() != null) {
       String vName = inputContext.getTaskVertexName();
       conf = MRHelpers.createConfFromUserPayload(inputContext.getUserPayload());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/50589893/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
index 40e56f1..1e71b08 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
@@ -46,6 +46,7 @@ public class TestOutput implements LogicalOutput {
   public List<Event> initialize(TezOutputContext outputContext)
       throws Exception {
     this.outputContext = outputContext;
+    this.outputContext.requestInitialMemory(0l, null); //Mandatory call. Fix null in TEZ-815.
     return null;
   }
 


Mime
View raw message