hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1531741 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java...
Date Sun, 13 Oct 2013 20:17:53 GMT
Author: sseth
Date: Sun Oct 13 20:17:53 2013
New Revision: 1531741

URL: http://svn.apache.org/r1531741
Log:
MAPREDUCE-5329. Allow MR applications to use additional AuxServices, which are compatible
with the default MapReduce shuffle. Contributed by Avner BenHanoch.

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestShuffleProvider.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1531741&r1=1531740&r2=1531741&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Sun Oct 13 20:17:53 2013
@@ -198,6 +198,10 @@ Release 2.2.1 - UNRELEASED
   NEW FEATURES
 
   IMPROVEMENTS
+  
+    MAPREDUCE-5329. Allow MR applications to use additional AuxServices,
+    which are compatible with the default MapReduce shuffle.
+    (Avner BenHanoch via sseth)
 
   OPTIMIZATIONS
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1531741&r1=1531740&r2=1531741&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
Sun Oct 13 20:17:53 2013
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.Collection;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -722,6 +723,32 @@ public abstract class TaskAttemptImpl im
       serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
           ShuffleHandler.serializeServiceData(shuffleToken));
 
+      // add external shuffle-providers - if any
+      Collection<String> shuffleProviders = conf.getStringCollection(
+          MRJobConfig.MAPREDUCE_JOB_SHUFFLE_PROVIDER_SERVICES);
+      if (! shuffleProviders.isEmpty()) {
+        Collection<String> auxNames = conf.getStringCollection(
+            YarnConfiguration.NM_AUX_SERVICES);
+
+        for (final String shuffleProvider : shuffleProviders) {
+          if (shuffleProvider.equals(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID)) {
+            continue; // skip built-in shuffle-provider that was already inserted with shuffle
secret key
+          }
+          if (auxNames.contains(shuffleProvider)) {
+                LOG.info("Adding ShuffleProvider Service: " + shuffleProvider + " to serviceData");
+                // This only serves for INIT_APP notifications
+                // The shuffle service needs to be able to work with the host:port information
provided by the AM
+                // (i.e. shuffle services which require custom location / other configuration
are not supported)
+                serviceData.put(shuffleProvider, ByteBuffer.allocate(0));
+          }
+          else {
+            throw new YarnRuntimeException("ShuffleProvider Service: " + shuffleProvider
+
+            " was NOT found in the list of aux-services that are available in this NM." +
+            " You may need to specify this ShuffleProvider as an aux-service in your yarn-site.xml");
+          }
+        }
+      }
+
       Apps.addToEnvironment(
           environment,  
           Environment.CLASSPATH.name(), 

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestShuffleProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestShuffleProvider.java?rev=1531741&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestShuffleProvider.java
(added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestShuffleProvider.java
Sun Oct 13 20:17:53 2013
@@ -0,0 +1,159 @@
+/**
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.job.impl;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapTaskAttemptImpl;
+import org.apache.hadoop.mapred.WrappedJvmID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Test;
+import org.junit.Assert;
+
+public class TestShuffleProvider {
+
+  @Test
+  public void testShuffleProviders() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 1);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    Path jobFile = mock(Path.class);
+
+    EventHandler eventHandler = mock(EventHandler.class);
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+
+    jobConf.set(YarnConfiguration.NM_AUX_SERVICES,
+      TestShuffleHandler1.MAPREDUCE_TEST_SHUFFLE_SERVICEID + "," +
+      TestShuffleHandler2.MAPREDUCE_TEST_SHUFFLE_SERVICEID);
+
+    String serviceName = TestShuffleHandler1.MAPREDUCE_TEST_SHUFFLE_SERVICEID;
+    String serviceStr = String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, serviceName);
+    jobConf.set(serviceStr, TestShuffleHandler1.class.getName());
+
+    serviceName = TestShuffleHandler2.MAPREDUCE_TEST_SHUFFLE_SERVICEID;
+    serviceStr = String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, serviceName);
+    jobConf.set(serviceStr, TestShuffleHandler2.class.getName());
+
+    jobConf.set(MRJobConfig.MAPREDUCE_JOB_SHUFFLE_PROVIDER_SERVICES,
+                  TestShuffleHandler1.MAPREDUCE_TEST_SHUFFLE_SERVICEID
+                     + "," + TestShuffleHandler2.MAPREDUCE_TEST_SHUFFLE_SERVICEID);
+
+    Credentials credentials = new Credentials();
+    Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
+        ("tokenid").getBytes(), ("tokenpw").getBytes(),
+        new Text("tokenkind"), new Text("tokenservice"));
+    TaskAttemptImpl taImpl =
+        new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+            mock(TaskSplitMetaInfo.class), jobConf, taListener,
+            jobToken, credentials,
+            new SystemClock(), null);
+
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());
+
+    ContainerLaunchContext launchCtx =
+        TaskAttemptImpl.createContainerLaunchContext(null,
+            jobConf, jobToken, taImpl.createRemoteTask(),
+            TypeConverter.fromYarn(jobId),
+            mock(WrappedJvmID.class), taListener,
+            credentials);
+
+    Map<String, ByteBuffer> serviceDataMap = launchCtx.getServiceData();
+    Assert.assertNotNull("TestShuffleHandler1 is missing", serviceDataMap.get(TestShuffleHandler1.MAPREDUCE_TEST_SHUFFLE_SERVICEID));
+    Assert.assertNotNull("TestShuffleHandler2 is missing", serviceDataMap.get(TestShuffleHandler2.MAPREDUCE_TEST_SHUFFLE_SERVICEID));
+    Assert.assertTrue("mismatch number of services in map", serviceDataMap.size() == 3);
// 2 that we entered + 1 for the built-in shuffle-provider
+  }
+
+  static public class StubbedFS extends RawLocalFileSystem {
+    @Override
+    public FileStatus getFileStatus(Path f) throws IOException {
+      return new FileStatus(1, false, 1, 1, 1, f);
+    }
+  }
+
+  static public class TestShuffleHandler1 extends AuxiliaryService {
+    public static final String MAPREDUCE_TEST_SHUFFLE_SERVICEID = "test_shuffle1";
+    public TestShuffleHandler1() {
+      super("testshuffle1");
+    }
+    @Override
+    public void initializeApplication(ApplicationInitializationContext context) {
+    }
+    @Override
+    public void stopApplication(ApplicationTerminationContext context) {
+    }
+    @Override
+    public synchronized ByteBuffer getMetaData() {
+      return ByteBuffer.allocate(0); // Don't 'return null' because of YARN-1256
+    }
+  }
+
+  static public class TestShuffleHandler2 extends AuxiliaryService {
+    public static final String MAPREDUCE_TEST_SHUFFLE_SERVICEID = "test_shuffle2";
+    public TestShuffleHandler2() {
+      super("testshuffle2");
+    }
+    @Override
+    public void initializeApplication(ApplicationInitializationContext context) {
+    }
+    @Override
+    public void stopApplication(ApplicationTerminationContext context) {
+    }
+    @Override
+    public synchronized ByteBuffer getMetaData() {
+      return ByteBuffer.allocate(0); // Don't 'return null' because of YARN-1256
+    }
+  }
+}

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1531741&r1=1531740&r2=1531741&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
Sun Oct 13 20:17:53 2013
@@ -133,6 +133,13 @@ public interface MRJobConfig {
 
   public static final String MAPREDUCE_JOB_CLASSLOADER = "mapreduce.job.classloader";
 
+  /**
+   * A comma-separated list of services that function as ShuffleProvider aux-services
+   * (in addition to the built-in ShuffleHandler).
+   * These services can serve shuffle requests from reducetasks.
+   */
+  public static final String MAPREDUCE_JOB_SHUFFLE_PROVIDER_SERVICES = "mapreduce.job.shuffle.provider.services";
+
   public static final String MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES = "mapreduce.job.classloader.system.classes";
 
   public static final String MAPREDUCE_JVM_SYSTEM_PROPERTIES_TO_LOG = "mapreduce.jvm.system-properties-to-log";



Mime
View raw message