Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1E177E541 for ; Wed, 6 Mar 2013 01:53:13 +0000 (UTC) Received: (qmail 62777 invoked by uid 500); 6 Mar 2013 01:53:12 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 62718 invoked by uid 500); 6 Mar 2013 01:53:12 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 62710 invoked by uid 99); 6 Mar 2013 01:53:12 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Mar 2013 01:53:12 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,NORMAL_HTTP_TO_IP X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Mar 2013 01:53:08 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id EF3322388B45; Wed, 6 Mar 2013 01:52:47 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1453127 - in /hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/ hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apa... Date: Wed, 06 Mar 2013 01:52:47 -0000 To: mapreduce-commits@hadoop.apache.org From: suresh@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130306015247.EF3322388B45@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: suresh Date: Wed Mar 6 01:52:45 2013 New Revision: 1453127 URL: http://svn.apache.org/r1453127 Log: Merging latest trunk to branch-trunk-win Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/ (props changed) hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt (contents, props changed) hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/conf/ (props changed) hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (contents, props changed) hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1452660-1453119 Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt?rev=1453127&r1=1453126&r2=1453127&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt Wed Mar 6 01:52:45 2013 @@ -731,6 +731,9 @@ Release 0.23.7 - UNRELEASED MAPREDUCE-4989. JSONify DataTables input data for Attempts page (Ravi Prakash via jlowe) + MAPREDUCE-5027. Shuffle does not limit number of outstanding connections + (Robert Parker via jeagles) + OPTIMIZATIONS MAPREDUCE-4946. Fix a performance problem for large jobs by reducing the Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1452660-1453119 Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/conf/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1452660-1453119 Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1453127&r1=1453126&r2=1453127&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Wed Mar 6 01:52:45 2013 @@ -296,6 +296,14 @@ + mapreduce.shuffle.max.connections + 0 + Max allowed connections for the shuffle. Set to 0 (zero) + to indicate no limit on the number of connections. + + + + mapreduce.reduce.markreset.buffer.percent 0.0 The percentage of memory -relative to the maximum heap size- to Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1452660-1453119 Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1453127&r1=1453126&r2=1453127&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Wed Mar 6 01:52:45 2013 @@ -88,6 +88,7 @@ import org.jboss.netty.channel.ChannelFu import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; @@ -121,7 +122,7 @@ public class ShuffleHandler extends Abst public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes"; public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024; - + // pattern to identify errors related to the client closing the socket early // idea borrowed from Netty SslHandler private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile( @@ -133,15 +134,15 @@ public class ShuffleHandler extends Abst private final ChannelGroup accepted = new DefaultChannelGroup(); protected HttpPipelineFactory pipelineFact; private int sslFileBufferSize; - + /** * Should the shuffle use posix_fadvise calls to manage the OS cache during * sendfile */ private boolean manageOsCache; private int readaheadLength; + private int maxShuffleConnections; private ReadaheadPool readaheadPool = ReadaheadPool.getInstance(); - public static final String MAPREDUCE_SHUFFLE_SERVICEID = "mapreduce.shuffle"; @@ -159,6 +160,9 @@ public class ShuffleHandler extends Abst public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024; + public static final String MAX_SHUFFLE_CONNECTIONS = "mapreduce.shuffle.max.connections"; + public static final int DEFAULT_MAX_SHUFFLE_CONNECTIONS = 0; // 0 implies no limit + @Metrics(about="Shuffle output metrics", context="mapred") static class ShuffleMetrics implements ChannelFutureListener { @Metric("Shuffle output in bytes") @@ -270,6 +274,9 @@ public class ShuffleHandler extends Abst readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES, DEFAULT_SHUFFLE_READAHEAD_BYTES); + maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS, + DEFAULT_MAX_SHUFFLE_CONNECTIONS); + ThreadFactory bossFactory = new ThreadFactoryBuilder() .setNameFormat("ShuffleHandler Netty Boss #%d") .build(); @@ -400,6 +407,21 @@ public class ShuffleHandler extends Abst } @Override + public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) + throws Exception { + if ((maxShuffleConnections > 0) && (accepted.size() >= maxShuffleConnections)) { + LOG.info(String.format("Current number of shuffle connections (%d) is " + + "greater than or equal to the max allowed shuffle connections (%d)", + accepted.size(), maxShuffleConnections)); + evt.getChannel().close(); + return; + } + accepted.add(evt.getChannel()); + super.channelOpen(ctx, evt); + + } + + @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) throws Exception { HttpRequest request = (HttpRequest) evt.getMessage(); @@ -620,6 +642,5 @@ public class ShuffleHandler extends Abst sendError(ctx, INTERNAL_SERVER_ERROR); } } - } } Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java?rev=1453127&r1=1453126&r2=1453127&view=diff ============================================================================== --- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java (original) +++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Wed Mar 6 01:52:45 2013 @@ -24,13 +24,15 @@ import static org.apache.hadoop.test.Moc import static org.apache.hadoop.test.MockitoMaker.stub; import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer; import static org.junit.Assert.assertEquals; - import java.io.DataInputStream; import java.io.IOException; import java.net.HttpURLConnection; +import java.net.SocketException; import java.net.URL; import java.util.ArrayList; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader; @@ -47,10 +49,13 @@ import org.jboss.netty.handler.codec.htt import org.junit.Assert; import org.junit.Test; + public class TestShuffleHandler { - static final long MiB = 1024 * 1024; + static final long MiB = 1024 * 1024; + private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class); - @Test public void testSerializeMeta() throws Exception { + @Test (timeout = 10000) + public void testSerializeMeta() throws Exception { assertEquals(1, ShuffleHandler.deserializeMetaData( ShuffleHandler.serializeMetaData(1))); assertEquals(-1, ShuffleHandler.deserializeMetaData( @@ -59,7 +64,8 @@ public class TestShuffleHandler { ShuffleHandler.serializeMetaData(8080))); } - @Test public void testShuffleMetrics() throws Exception { + @Test (timeout = 10000) + public void testShuffleMetrics() throws Exception { MetricsSystem ms = new MetricsSystemImpl(); ShuffleHandler sh = new ShuffleHandler(ms); ChannelFuture cf = make(stub(ChannelFuture.class). @@ -88,7 +94,7 @@ public class TestShuffleHandler { assertGauge("ShuffleConnections", connections, rb); } - @Test + @Test (timeout = 10000) public void testClientClosesConnection() throws Exception { final ArrayList failures = new ArrayList(1); Configuration conf = new Configuration(); @@ -159,4 +165,84 @@ public class TestShuffleHandler { Assert.assertTrue("sendError called when client closed connection", failures.size() == 0); } + + @Test (timeout = 10000) + public void testMaxConnections() throws Exception { + + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); + ShuffleHandler shuffleHandler = new ShuffleHandler() { + @Override + protected Shuffle getShuffle(Configuration conf) { + // replace the shuffle handler with one stubbed for testing + return new Shuffle(conf) { + @Override + protected void verifyRequest(String appid, ChannelHandlerContext ctx, + HttpRequest request, HttpResponse response, URL requestUri) + throws IOException { + } + @Override + protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, + Channel ch, String user, String jobId, String mapId, int reduce) + throws IOException { + // send a shuffle header and a lot of data down the channel + // to trigger a broken pipe + ShuffleHeader header = + new ShuffleHeader("dummy_header", 5678, 5678, 1); + DataOutputBuffer dob = new DataOutputBuffer(); + header.write(dob); + ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + dob = new DataOutputBuffer(); + for (int i=0; i<100000; ++i) { + header.write(dob); + } + return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + } + }; + } + }; + shuffleHandler.init(conf); + shuffleHandler.start(); + + // setup connections + int connAttempts = 3; + HttpURLConnection conns[] = new HttpURLConnection[connAttempts]; + + for (int i = 0; i < connAttempts; i++) { + String URLstring = "http://127.0.0.1:" + + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_" + + i + "_0"; + URL url = new URL(URLstring); + conns[i] = (HttpURLConnection)url.openConnection(); + } + + // Try to open numerous connections + for (int i = 0; i < connAttempts; i++) { + conns[i].connect(); + } + + //Ensure first connections are okay + conns[0].getInputStream(); + int rc = conns[0].getResponseCode(); + Assert.assertEquals(HttpURLConnection.HTTP_OK, rc); + + conns[1].getInputStream(); + rc = conns[1].getResponseCode(); + Assert.assertEquals(HttpURLConnection.HTTP_OK, rc); + + // This connection should be closed because it to above the limit + try { + conns[2].getInputStream(); + rc = conns[2].getResponseCode(); + Assert.fail("Expected a SocketException"); + } catch (SocketException se) { + LOG.info("Expected - connection should not be open"); + } catch (Exception e) { + Assert.fail("Expected a SocketException"); + } + + shuffleHandler.stop(); + } }