spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [08/14] spark git commit: [SPARK-13529][BUILD] Move network/* modules into common/network-*
Date Mon, 29 Feb 2016 01:25:24 GMT
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
new file mode 100644
index 0000000..9379412
--- /dev/null
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.server.OneForOneStreamManager;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
+import org.apache.spark.network.shuffle.protocol.OpenBlocks;
+import org.apache.spark.network.shuffle.protocol.RegisterExecutor;
+import org.apache.spark.network.shuffle.protocol.StreamHandle;
+import org.apache.spark.network.shuffle.protocol.UploadBlock;
+
+public class ExternalShuffleBlockHandlerSuite {
+  TransportClient client = mock(TransportClient.class);
+
+  OneForOneStreamManager streamManager;
+  ExternalShuffleBlockResolver blockResolver;
+  RpcHandler handler;
+
+  @Before
+  public void beforeEach() {
+    streamManager = mock(OneForOneStreamManager.class);
+    blockResolver = mock(ExternalShuffleBlockResolver.class);
+    handler = new ExternalShuffleBlockHandler(streamManager, blockResolver);
+  }
+
+  @Test
+  public void testRegisterExecutor() {
+    RpcResponseCallback callback = mock(RpcResponseCallback.class);
+
+    ExecutorShuffleInfo config = new ExecutorShuffleInfo(new String[] {"/a", "/b"}, 16, "sort");
+    ByteBuffer registerMessage = new RegisterExecutor("app0", "exec1", config).toByteBuffer();
+    handler.receive(client, registerMessage, callback);
+    verify(blockResolver, times(1)).registerExecutor("app0", "exec1", config);
+
+    verify(callback, times(1)).onSuccess(any(ByteBuffer.class));
+    verify(callback, never()).onFailure(any(Throwable.class));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testOpenShuffleBlocks() {
+    RpcResponseCallback callback = mock(RpcResponseCallback.class);
+
+    ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
+    ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
+    when(blockResolver.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker);
+    when(blockResolver.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker);
+    ByteBuffer openBlocks = new OpenBlocks("app0", "exec1", new String[] { "b0", "b1" })
+      .toByteBuffer();
+    handler.receive(client, openBlocks, callback);
+    verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0");
+    verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1");
+
+    ArgumentCaptor<ByteBuffer> response = ArgumentCaptor.forClass(ByteBuffer.class);
+    verify(callback, times(1)).onSuccess(response.capture());
+    verify(callback, never()).onFailure((Throwable) any());
+
+    StreamHandle handle =
+      (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response.getValue());
+    assertEquals(2, handle.numChunks);
+
+    @SuppressWarnings("unchecked")
+    ArgumentCaptor<Iterator<ManagedBuffer>> stream = (ArgumentCaptor<Iterator<ManagedBuffer>>)
+        (ArgumentCaptor<?>) ArgumentCaptor.forClass(Iterator.class);
+    verify(streamManager, times(1)).registerStream(anyString(), stream.capture());
+    Iterator<ManagedBuffer> buffers = stream.getValue();
+    assertEquals(block0Marker, buffers.next());
+    assertEquals(block1Marker, buffers.next());
+    assertFalse(buffers.hasNext());
+  }
+
+  @Test
+  public void testBadMessages() {
+    RpcResponseCallback callback = mock(RpcResponseCallback.class);
+
+    ByteBuffer unserializableMsg = ByteBuffer.wrap(new byte[] { 0x12, 0x34, 0x56 });
+    try {
+      handler.receive(client, unserializableMsg, callback);
+      fail("Should have thrown");
+    } catch (Exception e) {
+      // pass
+    }
+
+    ByteBuffer unexpectedMsg = new UploadBlock("a", "e", "b", new byte[1], new byte[2]).toByteBuffer();
+    try {
+      handler.receive(client, unexpectedMsg, callback);
+      fail("Should have thrown");
+    } catch (UnsupportedOperationException e) {
+      // pass
+    }
+
+    verify(callback, never()).onSuccess(any(ByteBuffer.class));
+    verify(callback, never()).onFailure(any(Throwable.class));
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
new file mode 100644
index 0000000..60a1b8b
--- /dev/null
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.io.CharStreams;
+import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
+import org.apache.spark.network.util.SystemPropertyConfigProvider;
+import org.apache.spark.network.util.TransportConf;
+import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class ExternalShuffleBlockResolverSuite {
+  static String sortBlock0 = "Hello!";
+  static String sortBlock1 = "World!";
+
+  static String hashBlock0 = "Elementary";
+  static String hashBlock1 = "Tabular";
+
+  static TestShuffleDataContext dataContext;
+
+  static TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
+
+  @BeforeClass
+  public static void beforeAll() throws IOException {
+    dataContext = new TestShuffleDataContext(2, 5);
+
+    dataContext.create();
+    // Write some sort and hash data.
+    dataContext.insertSortShuffleData(0, 0,
+      new byte[][] { sortBlock0.getBytes(), sortBlock1.getBytes() } );
+    dataContext.insertHashShuffleData(1, 0,
+      new byte[][] { hashBlock0.getBytes(), hashBlock1.getBytes() } );
+  }
+
+  @AfterClass
+  public static void afterAll() {
+    dataContext.cleanup();
+  }
+
+  @Test
+  public void testBadRequests() throws IOException {
+    ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
+    // Unregistered executor
+    try {
+      resolver.getBlockData("app0", "exec1", "shuffle_1_1_0");
+      fail("Should have failed");
+    } catch (RuntimeException e) {
+      assertTrue("Bad error message: " + e, e.getMessage().contains("not registered"));
+    }
+
+    // Invalid shuffle manager
+    resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
+    try {
+      resolver.getBlockData("app0", "exec2", "shuffle_1_1_0");
+      fail("Should have failed");
+    } catch (UnsupportedOperationException e) {
+      // pass
+    }
+
+    // Nonexistent shuffle block
+    resolver.registerExecutor("app0", "exec3",
+      dataContext.createExecutorInfo("sort"));
+    try {
+      resolver.getBlockData("app0", "exec3", "shuffle_1_1_0");
+      fail("Should have failed");
+    } catch (Exception e) {
+      // pass
+    }
+  }
+
+  @Test
+  public void testSortShuffleBlocks() throws IOException {
+    ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
+    resolver.registerExecutor("app0", "exec0",
+      dataContext.createExecutorInfo("sort"));
+
+    InputStream block0Stream =
+      resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream();
+    String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
+    block0Stream.close();
+    assertEquals(sortBlock0, block0);
+
+    InputStream block1Stream =
+      resolver.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream();
+    String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
+    block1Stream.close();
+    assertEquals(sortBlock1, block1);
+  }
+
+  @Test
+  public void testHashShuffleBlocks() throws IOException {
+    ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
+    resolver.registerExecutor("app0", "exec0",
+      dataContext.createExecutorInfo("hash"));
+
+    InputStream block0Stream =
+      resolver.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream();
+    String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
+    block0Stream.close();
+    assertEquals(hashBlock0, block0);
+
+    InputStream block1Stream =
+      resolver.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream();
+    String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
+    block1Stream.close();
+    assertEquals(hashBlock1, block1);
+  }
+
+  @Test
+  public void jsonSerializationOfExecutorRegistration() throws IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    AppExecId appId = new AppExecId("foo", "bar");
+    String appIdJson = mapper.writeValueAsString(appId);
+    AppExecId parsedAppId = mapper.readValue(appIdJson, AppExecId.class);
+    assertEquals(parsedAppId, appId);
+
+    ExecutorShuffleInfo shuffleInfo =
+      new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, "hash");
+    String shuffleJson = mapper.writeValueAsString(shuffleInfo);
+    ExecutorShuffleInfo parsedShuffleInfo =
+      mapper.readValue(shuffleJson, ExecutorShuffleInfo.class);
+    assertEquals(parsedShuffleInfo, shuffleInfo);
+
+    // Intentionally keep these hard-coded strings in here, to check backwards-compatability.
+    // its not legacy yet, but keeping this here in case anybody changes it
+    String legacyAppIdJson = "{\"appId\":\"foo\", \"execId\":\"bar\"}";
+    assertEquals(appId, mapper.readValue(legacyAppIdJson, AppExecId.class));
+    String legacyShuffleJson = "{\"localDirs\": [\"/bippy\", \"/flippy\"], " +
+      "\"subDirsPerLocalDir\": 7, \"shuffleManager\": \"hash\"}";
+    assertEquals(shuffleInfo, mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class));
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
new file mode 100644
index 0000000..532d7ab
--- /dev/null
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.Test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.spark.network.util.SystemPropertyConfigProvider;
+import org.apache.spark.network.util.TransportConf;
+
+public class ExternalShuffleCleanupSuite {
+
+  // Same-thread Executor used to ensure cleanup happens synchronously in test thread.
+  Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
+  TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
+
+  @Test
+  public void noCleanupAndCleanup() throws IOException {
+    TestShuffleDataContext dataContext = createSomeData();
+
+    ExternalShuffleBlockResolver resolver =
+      new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
+    resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
+    resolver.applicationRemoved("app", false /* cleanup */);
+
+    assertStillThere(dataContext);
+
+    resolver.registerExecutor("app", "exec1", dataContext.createExecutorInfo("shuffleMgr"));
+    resolver.applicationRemoved("app", true /* cleanup */);
+
+    assertCleanedUp(dataContext);
+  }
+
+  @Test
+  public void cleanupUsesExecutor() throws IOException {
+    TestShuffleDataContext dataContext = createSomeData();
+
+    final AtomicBoolean cleanupCalled = new AtomicBoolean(false);
+
+    // Executor which does nothing to ensure we're actually using it.
+    Executor noThreadExecutor = new Executor() {
+      @Override public void execute(Runnable runnable) { cleanupCalled.set(true); }
+    };
+
+    ExternalShuffleBlockResolver manager =
+      new ExternalShuffleBlockResolver(conf, null, noThreadExecutor);
+
+    manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
+    manager.applicationRemoved("app", true);
+
+    assertTrue(cleanupCalled.get());
+    assertStillThere(dataContext);
+
+    dataContext.cleanup();
+    assertCleanedUp(dataContext);
+  }
+
+  @Test
+  public void cleanupMultipleExecutors() throws IOException {
+    TestShuffleDataContext dataContext0 = createSomeData();
+    TestShuffleDataContext dataContext1 = createSomeData();
+
+    ExternalShuffleBlockResolver resolver =
+      new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
+
+    resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
+    resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr"));
+    resolver.applicationRemoved("app", true);
+
+    assertCleanedUp(dataContext0);
+    assertCleanedUp(dataContext1);
+  }
+
+  @Test
+  public void cleanupOnlyRemovedApp() throws IOException {
+    TestShuffleDataContext dataContext0 = createSomeData();
+    TestShuffleDataContext dataContext1 = createSomeData();
+
+    ExternalShuffleBlockResolver resolver =
+      new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
+
+    resolver.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
+    resolver.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr"));
+
+    resolver.applicationRemoved("app-nonexistent", true);
+    assertStillThere(dataContext0);
+    assertStillThere(dataContext1);
+
+    resolver.applicationRemoved("app-0", true);
+    assertCleanedUp(dataContext0);
+    assertStillThere(dataContext1);
+
+    resolver.applicationRemoved("app-1", true);
+    assertCleanedUp(dataContext0);
+    assertCleanedUp(dataContext1);
+
+    // Make sure it's not an error to cleanup multiple times
+    resolver.applicationRemoved("app-1", true);
+    assertCleanedUp(dataContext0);
+    assertCleanedUp(dataContext1);
+  }
+
+  private void assertStillThere(TestShuffleDataContext dataContext) {
+    for (String localDir : dataContext.localDirs) {
+      assertTrue(localDir + " was cleaned up prematurely", new File(localDir).exists());
+    }
+  }
+
+  private void assertCleanedUp(TestShuffleDataContext dataContext) {
+    for (String localDir : dataContext.localDirs) {
+      assertFalse(localDir + " wasn't cleaned up", new File(localDir).exists());
+    }
+  }
+
+  private TestShuffleDataContext createSomeData() throws IOException {
+    Random rand = new Random(123);
+    TestShuffleDataContext dataContext = new TestShuffleDataContext(10, 5);
+
+    dataContext.create();
+    dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000),
+      new byte[][] { "ABC".getBytes(), "DEF".getBytes() } );
+    dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000,
+      new byte[][] { "GHI".getBytes(), "JKLMNOPQRSTUVWXYZ".getBytes() } );
+    return dataContext;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
new file mode 100644
index 0000000..5e706bf
--- /dev/null
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import org.apache.spark.network.TestUtils;
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.server.TransportServer;
+import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
+import org.apache.spark.network.util.SystemPropertyConfigProvider;
+import org.apache.spark.network.util.TransportConf;
+
+public class ExternalShuffleIntegrationSuite {
+
+  static String APP_ID = "app-id";
+  static String SORT_MANAGER = "sort";
+  static String HASH_MANAGER = "hash";
+
+  // Executor 0 is sort-based
+  static TestShuffleDataContext dataContext0;
+  // Executor 1 is hash-based
+  static TestShuffleDataContext dataContext1;
+
+  static ExternalShuffleBlockHandler handler;
+  static TransportServer server;
+  static TransportConf conf;
+
+  static byte[][] exec0Blocks = new byte[][] {
+    new byte[123],
+    new byte[12345],
+    new byte[1234567],
+  };
+
+  static byte[][] exec1Blocks = new byte[][] {
+    new byte[321],
+    new byte[54321],
+  };
+
+  @BeforeClass
+  public static void beforeAll() throws IOException {
+    Random rand = new Random();
+
+    for (byte[] block : exec0Blocks) {
+      rand.nextBytes(block);
+    }
+    for (byte[] block: exec1Blocks) {
+      rand.nextBytes(block);
+    }
+
+    dataContext0 = new TestShuffleDataContext(2, 5);
+    dataContext0.create();
+    dataContext0.insertSortShuffleData(0, 0, exec0Blocks);
+
+    dataContext1 = new TestShuffleDataContext(6, 2);
+    dataContext1.create();
+    dataContext1.insertHashShuffleData(1, 0, exec1Blocks);
+
+    conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
+    handler = new ExternalShuffleBlockHandler(conf, null);
+    TransportContext transportContext = new TransportContext(conf, handler);
+    server = transportContext.createServer();
+  }
+
+  @AfterClass
+  public static void afterAll() {
+    dataContext0.cleanup();
+    dataContext1.cleanup();
+    server.close();
+  }
+
+  @After
+  public void afterEach() {
+    handler.applicationRemoved(APP_ID, false /* cleanupLocalDirs */);
+  }
+
+  class FetchResult {
+    public Set<String> successBlocks;
+    public Set<String> failedBlocks;
+    public List<ManagedBuffer> buffers;
+
+    public void releaseBuffers() {
+      for (ManagedBuffer buffer : buffers) {
+        buffer.release();
+      }
+    }
+  }
+
+  // Fetch a set of blocks from a pre-registered executor.
+  private FetchResult fetchBlocks(String execId, String[] blockIds) throws Exception {
+    return fetchBlocks(execId, blockIds, server.getPort());
+  }
+
+  // Fetch a set of blocks from a pre-registered executor. Connects to the server on the given port,
+  // to allow connecting to invalid servers.
+  private FetchResult fetchBlocks(String execId, String[] blockIds, int port) throws Exception {
+    final FetchResult res = new FetchResult();
+    res.successBlocks = Collections.synchronizedSet(new HashSet<String>());
+    res.failedBlocks = Collections.synchronizedSet(new HashSet<String>());
+    res.buffers = Collections.synchronizedList(new LinkedList<ManagedBuffer>());
+
+    final Semaphore requestsRemaining = new Semaphore(0);
+
+    ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false);
+    client.init(APP_ID);
+    client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
+      new BlockFetchingListener() {
+        @Override
+        public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
+          synchronized (this) {
+            if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
+              data.retain();
+              res.successBlocks.add(blockId);
+              res.buffers.add(data);
+              requestsRemaining.release();
+            }
+          }
+        }
+
+        @Override
+        public void onBlockFetchFailure(String blockId, Throwable exception) {
+          synchronized (this) {
+            if (!res.successBlocks.contains(blockId) && !res.failedBlocks.contains(blockId)) {
+              res.failedBlocks.add(blockId);
+              requestsRemaining.release();
+            }
+          }
+        }
+      });
+
+    if (!requestsRemaining.tryAcquire(blockIds.length, 5, TimeUnit.SECONDS)) {
+      fail("Timeout getting response from the server");
+    }
+    client.close();
+    return res;
+  }
+
+  @Test
+  public void testFetchOneSort() throws Exception {
+    registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
+    FetchResult exec0Fetch = fetchBlocks("exec-0", new String[] { "shuffle_0_0_0" });
+    assertEquals(Sets.newHashSet("shuffle_0_0_0"), exec0Fetch.successBlocks);
+    assertTrue(exec0Fetch.failedBlocks.isEmpty());
+    assertBufferListsEqual(exec0Fetch.buffers, Lists.newArrayList(exec0Blocks[0]));
+    exec0Fetch.releaseBuffers();
+  }
+
+  @Test
+  public void testFetchThreeSort() throws Exception {
+    registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
+    FetchResult exec0Fetch = fetchBlocks("exec-0",
+      new String[] { "shuffle_0_0_0", "shuffle_0_0_1", "shuffle_0_0_2" });
+    assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_0_0_1", "shuffle_0_0_2"),
+      exec0Fetch.successBlocks);
+    assertTrue(exec0Fetch.failedBlocks.isEmpty());
+    assertBufferListsEqual(exec0Fetch.buffers, Lists.newArrayList(exec0Blocks));
+    exec0Fetch.releaseBuffers();
+  }
+
+  @Test
+  public void testFetchHash() throws Exception {
+    registerExecutor("exec-1", dataContext1.createExecutorInfo(HASH_MANAGER));
+    FetchResult execFetch = fetchBlocks("exec-1",
+      new String[] { "shuffle_1_0_0", "shuffle_1_0_1" });
+    assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.successBlocks);
+    assertTrue(execFetch.failedBlocks.isEmpty());
+    assertBufferListsEqual(execFetch.buffers, Lists.newArrayList(exec1Blocks));
+    execFetch.releaseBuffers();
+  }
+
+  @Test
+  public void testFetchWrongShuffle() throws Exception {
+    registerExecutor("exec-1", dataContext1.createExecutorInfo(SORT_MANAGER /* wrong manager */));
+    FetchResult execFetch = fetchBlocks("exec-1",
+      new String[] { "shuffle_1_0_0", "shuffle_1_0_1" });
+    assertTrue(execFetch.successBlocks.isEmpty());
+    assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks);
+  }
+
+  @Test
+  public void testFetchInvalidShuffle() throws Exception {
+    registerExecutor("exec-1", dataContext1.createExecutorInfo("unknown sort manager"));
+    FetchResult execFetch = fetchBlocks("exec-1",
+      new String[] { "shuffle_1_0_0" });
+    assertTrue(execFetch.successBlocks.isEmpty());
+    assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch.failedBlocks);
+  }
+
+  @Test
+  public void testFetchWrongBlockId() throws Exception {
+    registerExecutor("exec-1", dataContext1.createExecutorInfo(SORT_MANAGER /* wrong manager */));
+    FetchResult execFetch = fetchBlocks("exec-1",
+      new String[] { "rdd_1_0_0" });
+    assertTrue(execFetch.successBlocks.isEmpty());
+    assertEquals(Sets.newHashSet("rdd_1_0_0"), execFetch.failedBlocks);
+  }
+
+  @Test
+  public void testFetchNonexistent() throws Exception {
+    registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
+    FetchResult execFetch = fetchBlocks("exec-0",
+      new String[] { "shuffle_2_0_0" });
+    assertTrue(execFetch.successBlocks.isEmpty());
+    assertEquals(Sets.newHashSet("shuffle_2_0_0"), execFetch.failedBlocks);
+  }
+
+  @Test
+  public void testFetchWrongExecutor() throws Exception {
+    registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
+    FetchResult execFetch = fetchBlocks("exec-0",
+      new String[] { "shuffle_0_0_0" /* right */, "shuffle_1_0_0" /* wrong */ });
+    // Both still fail, as we start by checking for all block.
+    assertTrue(execFetch.successBlocks.isEmpty());
+    assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_1_0_0"), execFetch.failedBlocks);
+  }
+
+  @Test
+  public void testFetchUnregisteredExecutor() throws Exception {
+    registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
+    FetchResult execFetch = fetchBlocks("exec-2",
+      new String[] { "shuffle_0_0_0", "shuffle_1_0_0" });
+    assertTrue(execFetch.successBlocks.isEmpty());
+    assertEquals(Sets.newHashSet("shuffle_0_0_0", "shuffle_1_0_0"), execFetch.failedBlocks);
+  }
+
+  @Test
+  public void testFetchNoServer() throws Exception {
+    System.setProperty("spark.shuffle.io.maxRetries", "0");
+    try {
+      registerExecutor("exec-0", dataContext0.createExecutorInfo(SORT_MANAGER));
+      FetchResult execFetch = fetchBlocks("exec-0",
+        new String[]{"shuffle_1_0_0", "shuffle_1_0_1"}, 1 /* port */);
+      assertTrue(execFetch.successBlocks.isEmpty());
+      assertEquals(Sets.newHashSet("shuffle_1_0_0", "shuffle_1_0_1"), execFetch.failedBlocks);
+    } finally {
+      System.clearProperty("spark.shuffle.io.maxRetries");
+    }
+  }
+
+  private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo)
+      throws IOException {
+    ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false);
+    client.init(APP_ID);
+    client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(),
+      executorId, executorInfo);
+  }
+
+  private void assertBufferListsEqual(List<ManagedBuffer> list0, List<byte[]> list1)
+    throws Exception {
+    assertEquals(list0.size(), list1.size());
+    for (int i = 0; i < list0.size(); i ++) {
+      assertBuffersEqual(list0.get(i), new NioManagedBuffer(ByteBuffer.wrap(list1.get(i))));
+    }
+  }
+
+  private void assertBuffersEqual(ManagedBuffer buffer0, ManagedBuffer buffer1) throws Exception {
+    ByteBuffer nio0 = buffer0.nioByteBuffer();
+    ByteBuffer nio1 = buffer1.nioByteBuffer();
+
+    int len = nio0.remaining();
+    assertEquals(nio0.remaining(), nio1.remaining());
+    for (int i = 0; i < len; i ++) {
+      assertEquals(nio0.get(), nio1.get());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
new file mode 100644
index 0000000..08ddb37
--- /dev/null
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import org.apache.spark.network.TestUtils;
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.sasl.SaslServerBootstrap;
+import org.apache.spark.network.sasl.SecretKeyHolder;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.TransportServer;
+import org.apache.spark.network.server.TransportServerBootstrap;
+import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
+import org.apache.spark.network.util.SystemPropertyConfigProvider;
+import org.apache.spark.network.util.TransportConf;
+
+public class ExternalShuffleSecuritySuite {
+
+  TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
+  TransportServer server;
+
+  @Before
+  public void beforeEach() throws IOException {
+    TransportContext context =
+      new TransportContext(conf, new ExternalShuffleBlockHandler(conf, null));
+    TransportServerBootstrap bootstrap = new SaslServerBootstrap(conf,
+        new TestSecretKeyHolder("my-app-id", "secret"));
+    this.server = context.createServer(Arrays.asList(bootstrap));
+  }
+
+  @After
+  public void afterEach() {
+    if (server != null) {
+      server.close();
+      server = null;
+    }
+  }
+
+  @Test
+  public void testValid() throws IOException {
+    validate("my-app-id", "secret", false);
+  }
+
+  @Test
+  public void testBadAppId() {
+    try {
+      validate("wrong-app-id", "secret", false);
+    } catch (Exception e) {
+      assertTrue(e.getMessage(), e.getMessage().contains("Wrong appId!"));
+    }
+  }
+
+  @Test
+  public void testBadSecret() {
+    try {
+      validate("my-app-id", "bad-secret", false);
+    } catch (Exception e) {
+      assertTrue(e.getMessage(), e.getMessage().contains("Mismatched response"));
+    }
+  }
+
+  @Test
+  public void testEncryption() throws IOException {
+    validate("my-app-id", "secret", true);
+  }
+
+  /** Creates an ExternalShuffleClient and attempts to register with the server. */
+  private void validate(String appId, String secretKey, boolean encrypt) throws IOException {
+    ExternalShuffleClient client =
+      new ExternalShuffleClient(conf, new TestSecretKeyHolder(appId, secretKey), true, encrypt);
+    client.init(appId);
+    // Registration either succeeds or throws an exception.
+    client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",
+      new ExecutorShuffleInfo(new String[0], 0, ""));
+    client.close();
+  }
+
+  /** Provides a secret key holder which always returns the given secret key, for a single appId. */
+  static class TestSecretKeyHolder implements SecretKeyHolder {
+    private final String appId;
+    private final String secretKey;
+
+    TestSecretKeyHolder(String appId, String secretKey) {
+      this.appId = appId;
+      this.secretKey = secretKey;
+    }
+
+    @Override
+    public String getSaslUser(String appId) {
+      return "user";
+    }
+
+    @Override
+    public String getSecretKey(String appId) {
+      if (!appId.equals(this.appId)) {
+        throw new IllegalArgumentException("Wrong appId!");
+      }
+      return secretKey;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
new file mode 100644
index 0000000..2590b9c
--- /dev/null
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Maps;
+import io.netty.buffer.Unpooled;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NettyManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.client.ChunkReceivedCallback;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.shuffle.protocol.OpenBlocks;
+import org.apache.spark.network.shuffle.protocol.StreamHandle;
+
+public class OneForOneBlockFetcherSuite {
+  @Test
+  public void testFetchOne() {
+    LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
+    blocks.put("shuffle_0_0_0", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
+
+    BlockFetchingListener listener = fetchBlocks(blocks);
+
+    verify(listener).onBlockFetchSuccess("shuffle_0_0_0", blocks.get("shuffle_0_0_0"));
+  }
+
+  @Test
+  public void testFetchThree() {
+    LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
+    blocks.put("b0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
+    blocks.put("b1", new NioManagedBuffer(ByteBuffer.wrap(new byte[23])));
+    blocks.put("b2", new NettyManagedBuffer(Unpooled.wrappedBuffer(new byte[23])));
+
+    BlockFetchingListener listener = fetchBlocks(blocks);
+
+    for (int i = 0; i < 3; i ++) {
+      verify(listener, times(1)).onBlockFetchSuccess("b" + i, blocks.get("b" + i));
+    }
+  }
+
+  @Test
+  public void testFailure() {
+    LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
+    blocks.put("b0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
+    blocks.put("b1", null);
+    blocks.put("b2", null);
+
+    BlockFetchingListener listener = fetchBlocks(blocks);
+
+    // Each failure will cause a failure to be invoked in all remaining block fetches.
+    verify(listener, times(1)).onBlockFetchSuccess("b0", blocks.get("b0"));
+    verify(listener, times(1)).onBlockFetchFailure(eq("b1"), (Throwable) any());
+    verify(listener, times(2)).onBlockFetchFailure(eq("b2"), (Throwable) any());
+  }
+
+  @Test
+  public void testFailureAndSuccess() {
+    LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
+    blocks.put("b0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
+    blocks.put("b1", null);
+    blocks.put("b2", new NioManagedBuffer(ByteBuffer.wrap(new byte[21])));
+
+    BlockFetchingListener listener = fetchBlocks(blocks);
+
+    // We may call both success and failure for the same block.
+    verify(listener, times(1)).onBlockFetchSuccess("b0", blocks.get("b0"));
+    verify(listener, times(1)).onBlockFetchFailure(eq("b1"), (Throwable) any());
+    verify(listener, times(1)).onBlockFetchSuccess("b2", blocks.get("b2"));
+    verify(listener, times(1)).onBlockFetchFailure(eq("b2"), (Throwable) any());
+  }
+
+  @Test
+  public void testEmptyBlockFetch() {
+    try {
+      fetchBlocks(Maps.<String, ManagedBuffer>newLinkedHashMap());
+      fail();
+    } catch (IllegalArgumentException e) {
+      assertEquals("Zero-sized blockIds array", e.getMessage());
+    }
+  }
+
+  /**
+   * Begins a fetch on the given set of blocks by mocking out the server side of the RPC which
+   * simply returns the given (BlockId, Block) pairs.
+   * As "blocks" is a LinkedHashMap, the blocks are guaranteed to be returned in the same order
+   * that they were inserted in.
+   *
+   * If a block's buffer is "null", an exception will be thrown instead.
+   */
+  private BlockFetchingListener fetchBlocks(final LinkedHashMap<String, ManagedBuffer> blocks) {
+    TransportClient client = mock(TransportClient.class);
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+    final String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
+    OneForOneBlockFetcher fetcher =
+      new OneForOneBlockFetcher(client, "app-id", "exec-id", blockIds, listener);
+
+    // Respond to the "OpenBlocks" message with an appropirate ShuffleStreamHandle with streamId 123
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+        BlockTransferMessage message = BlockTransferMessage.Decoder.fromByteBuffer(
+          (ByteBuffer) invocationOnMock.getArguments()[0]);
+        RpcResponseCallback callback = (RpcResponseCallback) invocationOnMock.getArguments()[1];
+        callback.onSuccess(new StreamHandle(123, blocks.size()).toByteBuffer());
+        assertEquals(new OpenBlocks("app-id", "exec-id", blockIds), message);
+        return null;
+      }
+    }).when(client).sendRpc(any(ByteBuffer.class), any(RpcResponseCallback.class));
+
+    // Respond to each chunk request with a single buffer from our blocks array.
+    final AtomicInteger expectedChunkIndex = new AtomicInteger(0);
+    final Iterator<ManagedBuffer> blockIterator = blocks.values().iterator();
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        try {
+          long streamId = (Long) invocation.getArguments()[0];
+          int myChunkIndex = (Integer) invocation.getArguments()[1];
+          assertEquals(123, streamId);
+          assertEquals(expectedChunkIndex.getAndIncrement(), myChunkIndex);
+
+          ChunkReceivedCallback callback = (ChunkReceivedCallback) invocation.getArguments()[2];
+          ManagedBuffer result = blockIterator.next();
+          if (result != null) {
+            callback.onSuccess(myChunkIndex, result);
+          } else {
+            callback.onFailure(myChunkIndex, new RuntimeException("Failed " + myChunkIndex));
+          }
+        } catch (Exception e) {
+          e.printStackTrace();
+          fail("Unexpected failure");
+        }
+        return null;
+      }
+    }).when(client).fetchChunk(anyLong(), anyInt(), (ChunkReceivedCallback) any());
+
+    fetcher.start();
+    return listener;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
new file mode 100644
index 0000000..3a6ef0d
--- /dev/null
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.Stubber;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.util.SystemPropertyConfigProvider;
+import org.apache.spark.network.util.TransportConf;
+import static org.apache.spark.network.shuffle.RetryingBlockFetcher.BlockFetchStarter;
+
+/**
+ * Tests retry logic by throwing IOExceptions and ensuring that subsequent attempts are made to
+ * fetch the lost blocks.
+ */
+public class RetryingBlockFetcherSuite {
+
+  ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
+  ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
+  ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
+
+  @Before
+  public void beforeEach() {
+    System.setProperty("spark.shuffle.io.maxRetries", "2");
+    System.setProperty("spark.shuffle.io.retryWait", "0");
+  }
+
+  @After
+  public void afterEach() {
+    System.clearProperty("spark.shuffle.io.maxRetries");
+    System.clearProperty("spark.shuffle.io.retryWait");
+  }
+
+  @Test
+  public void testNoFailures() throws IOException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+      // Immediately return both blocks successfully.
+      ImmutableMap.<String, Object>builder()
+        .put("b0", block0)
+        .put("b1", block1)
+        .build()
+      );
+
+    performInteractions(interactions, listener);
+
+    verify(listener).onBlockFetchSuccess("b0", block0);
+    verify(listener).onBlockFetchSuccess("b1", block1);
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testUnrecoverableFailure() throws IOException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+      // b0 throws a non-IOException error, so it will be failed without retry.
+      ImmutableMap.<String, Object>builder()
+        .put("b0", new RuntimeException("Ouch!"))
+        .put("b1", block1)
+        .build()
+    );
+
+    performInteractions(interactions, listener);
+
+    verify(listener).onBlockFetchFailure(eq("b0"), (Throwable) any());
+    verify(listener).onBlockFetchSuccess("b1", block1);
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testSingleIOExceptionOnFirst() throws IOException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+      // IOException will cause a retry. Since b0 fails, we will retry both.
+      ImmutableMap.<String, Object>builder()
+        .put("b0", new IOException("Connection failed or something"))
+        .put("b1", block1)
+        .build(),
+      ImmutableMap.<String, Object>builder()
+        .put("b0", block0)
+        .put("b1", block1)
+        .build()
+    );
+
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
+    verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1);
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testSingleIOExceptionOnSecond() throws IOException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+      // IOException will cause a retry. Since b1 fails, we will not retry b0.
+      ImmutableMap.<String, Object>builder()
+        .put("b0", block0)
+        .put("b1", new IOException("Connection failed or something"))
+        .build(),
+      ImmutableMap.<String, Object>builder()
+        .put("b1", block1)
+        .build()
+    );
+
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
+    verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1);
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testTwoIOExceptions() throws IOException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+      // b0's IOException will trigger retry, b1's will be ignored.
+      ImmutableMap.<String, Object>builder()
+        .put("b0", new IOException())
+        .put("b1", new IOException())
+        .build(),
+      // Next, b0 is successful and b1 errors again, so we just request that one.
+      ImmutableMap.<String, Object>builder()
+        .put("b0", block0)
+        .put("b1", new IOException())
+        .build(),
+      // b1 returns successfully within 2 retries.
+      ImmutableMap.<String, Object>builder()
+        .put("b1", block1)
+        .build()
+    );
+
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
+    verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1);
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testThreeIOExceptions() throws IOException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+      // b0's IOException will trigger retry, b1's will be ignored.
+      ImmutableMap.<String, Object>builder()
+        .put("b0", new IOException())
+        .put("b1", new IOException())
+        .build(),
+      // Next, b0 is successful and b1 errors again, so we just request that one.
+      ImmutableMap.<String, Object>builder()
+        .put("b0", block0)
+        .put("b1", new IOException())
+        .build(),
+      // b1 errors again, but this was the last retry
+      ImmutableMap.<String, Object>builder()
+        .put("b1", new IOException())
+        .build(),
+      // This is not reached -- b1 has failed.
+      ImmutableMap.<String, Object>builder()
+        .put("b1", block1)
+        .build()
+    );
+
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
+    verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), (Throwable) any());
+    verifyNoMoreInteractions(listener);
+  }
+
+  @Test
+  public void testRetryAndUnrecoverable() throws IOException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+      // b0's IOException will trigger retry, subsequent messages will be ignored.
+      ImmutableMap.<String, Object>builder()
+        .put("b0", new IOException())
+        .put("b1", new RuntimeException())
+        .put("b2", block2)
+        .build(),
+      // Next, b0 is successful, b1 errors unrecoverably, and b2 triggers a retry.
+      ImmutableMap.<String, Object>builder()
+        .put("b0", block0)
+        .put("b1", new RuntimeException())
+        .put("b2", new IOException())
+        .build(),
+      // b2 succeeds in its last retry.
+      ImmutableMap.<String, Object>builder()
+        .put("b2", block2)
+        .build()
+    );
+
+    performInteractions(interactions, listener);
+
+    verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
+    verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), (Throwable) any());
+    verify(listener, timeout(5000)).onBlockFetchSuccess("b2", block2);
+    verifyNoMoreInteractions(listener);
+  }
+
+  /**
+   * Performs a set of interactions in response to block requests from a RetryingBlockFetcher.
+   * Each interaction is a Map from BlockId to either ManagedBuffer or Exception. This interaction
+   * means "respond to the next block fetch request with these Successful buffers and these Failure
+   * exceptions". We verify that the expected block ids are exactly the ones requested.
+   *
+   * If multiple interactions are supplied, they will be used in order. This is useful for encoding
+   * retries -- the first interaction may include an IOException, which causes a retry of some
+   * subset of the original blocks in a second interaction.
+   */
+  @SuppressWarnings("unchecked")
+  private static void performInteractions(List<? extends Map<String, Object>> interactions,
+                                          BlockFetchingListener listener)
+    throws IOException {
+
+    TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
+    BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class);
+
+    Stubber stub = null;
+
+    // Contains all blockIds that are referenced across all interactions.
+    final LinkedHashSet<String> blockIds = Sets.newLinkedHashSet();
+
+    for (final Map<String, Object> interaction : interactions) {
+      blockIds.addAll(interaction.keySet());
+
+      Answer<Void> answer = new Answer<Void>() {
+        @Override
+        public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+          try {
+            // Verify that the RetryingBlockFetcher requested the expected blocks.
+            String[] requestedBlockIds = (String[]) invocationOnMock.getArguments()[0];
+            String[] desiredBlockIds = interaction.keySet().toArray(new String[interaction.size()]);
+            assertArrayEquals(desiredBlockIds, requestedBlockIds);
+
+            // Now actually invoke the success/failure callbacks on each block.
+            BlockFetchingListener retryListener =
+              (BlockFetchingListener) invocationOnMock.getArguments()[1];
+            for (Map.Entry<String, Object> block : interaction.entrySet()) {
+              String blockId = block.getKey();
+              Object blockValue = block.getValue();
+
+              if (blockValue instanceof ManagedBuffer) {
+                retryListener.onBlockFetchSuccess(blockId, (ManagedBuffer) blockValue);
+              } else if (blockValue instanceof Exception) {
+                retryListener.onBlockFetchFailure(blockId, (Exception) blockValue);
+              } else {
+                fail("Can only handle ManagedBuffers and Exceptions, got " + blockValue);
+              }
+            }
+            return null;
+          } catch (Throwable e) {
+            e.printStackTrace();
+            throw e;
+          }
+        }
+      };
+
+      // This is either the first stub, or should be chained behind the prior ones.
+      if (stub == null) {
+        stub = doAnswer(answer);
+      } else {
+        stub.doAnswer(answer);
+      }
+    }
+
+    assert stub != null;
+    stub.when(fetchStarter).createAndStart((String[]) any(), (BlockFetchingListener) anyObject());
+    String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
+    new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
new file mode 100644
index 0000000..7ac1ca1
--- /dev/null
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+
+import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
+
+/**
+ * Manages some sort- and hash-based shuffle data, including the creation
+ * and cleanup of directories that can be read by the {@link ExternalShuffleBlockResolver}.
+ */
+public class TestShuffleDataContext {
+  public final String[] localDirs;
+  public final int subDirsPerLocalDir;
+
+  public TestShuffleDataContext(int numLocalDirs, int subDirsPerLocalDir) {
+    this.localDirs = new String[numLocalDirs];
+    this.subDirsPerLocalDir = subDirsPerLocalDir;
+  }
+
+  public void create() {
+    for (int i = 0; i < localDirs.length; i ++) {
+      localDirs[i] = Files.createTempDir().getAbsolutePath();
+
+      for (int p = 0; p < subDirsPerLocalDir; p ++) {
+        new File(localDirs[i], String.format("%02x", p)).mkdirs();
+      }
+    }
+  }
+
+  public void cleanup() {
+    for (String localDir : localDirs) {
+      deleteRecursively(new File(localDir));
+    }
+  }
+
+  /** Creates reducer blocks in a sort-based data format within our local dirs. */
+  public void insertSortShuffleData(int shuffleId, int mapId, byte[][] blocks) throws IOException {
+    String blockId = "shuffle_" + shuffleId + "_" + mapId + "_0";
+
+    OutputStream dataStream = null;
+    DataOutputStream indexStream = null;
+    boolean suppressExceptionsDuringClose = true;
+
+    try {
+      dataStream = new FileOutputStream(
+        ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".data"));
+      indexStream = new DataOutputStream(new FileOutputStream(
+        ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".index")));
+
+      long offset = 0;
+      indexStream.writeLong(offset);
+      for (byte[] block : blocks) {
+        offset += block.length;
+        dataStream.write(block);
+        indexStream.writeLong(offset);
+      }
+      suppressExceptionsDuringClose = false;
+    } finally {
+      Closeables.close(dataStream, suppressExceptionsDuringClose);
+      Closeables.close(indexStream, suppressExceptionsDuringClose);
+    }
+  }
+
+  /** Creates reducer blocks in a hash-based data format within our local dirs. */
+  public void insertHashShuffleData(int shuffleId, int mapId, byte[][] blocks) throws IOException {
+    for (int i = 0; i < blocks.length; i ++) {
+      String blockId = "shuffle_" + shuffleId + "_" + mapId + "_" + i;
+      Files.write(blocks[i],
+        ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId));
+    }
+  }
+
+  /**
+   * Creates an ExecutorShuffleInfo object based on the given shuffle manager which targets this
+   * context's directories.
+   */
+  public ExecutorShuffleInfo createExecutorInfo(String shuffleManager) {
+    return new ExecutorShuffleInfo(localDirs, subDirsPerLocalDir, shuffleManager);
+  }
+
+  private static void deleteRecursively(File f) {
+    assert f != null;
+    if (f.isDirectory()) {
+      File[] children = f.listFiles();
+      if (children != null) {
+        for (File child : children) {
+          deleteRecursively(child);
+        }
+      }
+    }
+    f.delete();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-yarn/pom.xml
----------------------------------------------------------------------
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
new file mode 100644
index 0000000..3cb4432
--- /dev/null
+++ b/common/network-yarn/pom.xml
@@ -0,0 +1,148 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-network-yarn_2.11</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Project YARN Shuffle Service</name>
+  <url>http://spark.apache.org/</url>
+  <properties>
+    <sbt.project.name>network-yarn</sbt.project.name>
+    <!-- Make sure all Hadoop dependencies are provided to avoid repackaging. -->
+    <hadoop.deps.scope>provided</hadoop.deps.scope>
+    <shuffle.jar>${project.build.directory}/scala-${scala.binary.version}/spark-${project.version}-yarn-shuffle.jar</shuffle.jar>
+    <shade>org/spark-project/</shade>
+  </properties>
+
+  <dependencies>
+    <!-- Core dependencies -->
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-network-shuffle_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
+    </dependency>
+
+    <!-- Provided dependencies -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <configuration>
+          <shadedArtifactAttached>false</shadedArtifactAttached>
+          <outputFile>${shuffle.jar}</outputFile>
+          <artifactSet>
+            <includes>
+              <include>*:*</include>
+            </includes>
+          </artifactSet>
+          <filters>
+            <filter>
+              <artifact>*:*</artifact>
+              <excludes>
+                <exclude>META-INF/*.SF</exclude>
+                <exclude>META-INF/*.DSA</exclude>
+                <exclude>META-INF/*.RSA</exclude>
+              </excludes>
+            </filter>
+          </filters>
+          <relocations>
+            <relocation>
+              <pattern>com.fasterxml.jackson</pattern>
+              <shadedPattern>org.spark-project.com.fasterxml.jackson</shadedPattern>
+              <includes>
+                <include>com.fasterxml.jackson.**</include>
+              </includes>
+            </relocation>
+          </relocations>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- probes to validate that those dependencies which must be shaded are  -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>verify</phase>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <configuration>
+              <target>
+                <macrodef name="shaded">
+                  <attribute name="resource"/>
+                  <sequential>
+                    <fail message="Not found ${shade}@{resource}">
+                      <condition>
+                        <not>
+                          <resourceexists>
+                            <zipentry zipfile="${shuffle.jar}" name="${shade}@{resource}"/>
+                          </resourceexists>
+                        </not>
+                      </condition>
+                    </fail>
+                  </sequential>
+                </macrodef>
+                <echo>Verifying dependency shading</echo>
+                <shaded resource="com/fasterxml/jackson/core/JsonParser.class" />
+                <shaded resource="com/fasterxml/jackson/annotation/JacksonAnnotation.class" />
+                <shaded resource="com/fasterxml/jackson/databind/JsonSerializer.class" />
+              </target>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
----------------------------------------------------------------------
diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
new file mode 100644
index 0000000..ba6d30a
--- /dev/null
+++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.yarn;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.sasl.SaslServerBootstrap;
+import org.apache.spark.network.sasl.ShuffleSecretManager;
+import org.apache.spark.network.server.TransportServer;
+import org.apache.spark.network.server.TransportServerBootstrap;
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
+import org.apache.spark.network.util.TransportConf;
+import org.apache.spark.network.yarn.util.HadoopConfigProvider;
+
+/**
+ * An external shuffle service used by Spark on Yarn.
+ *
+ * This is intended to be a long-running auxiliary service that runs in the NodeManager process.
+ * A Spark application may connect to this service by setting `spark.shuffle.service.enabled`.
+ * The application also automatically derives the service port through `spark.shuffle.service.port`
+ * specified in the Yarn configuration. This is so that both the clients and the server agree on
+ * the same port to communicate on.
+ *
+ * The service also optionally supports authentication. This ensures that executors from one
+ * application cannot read the shuffle files written by those from another. This feature can be
+ * enabled by setting `spark.authenticate` in the Yarn configuration before starting the NM.
+ * Note that the Spark application must also set `spark.authenticate` manually and, unlike in
+ * the case of the service port, will not inherit this setting from the Yarn configuration. This
+ * is because an application running on the same Yarn cluster may choose to not use the external
+ * shuffle service, in which case its setting of `spark.authenticate` should be independent of
+ * the service's.
+ */
+public class YarnShuffleService extends AuxiliaryService {
+  private final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class);
+
+  // Port on which the shuffle server listens for fetch requests
+  private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";
+  private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;
+
+  // Whether the shuffle server should authenticate fetch requests
+  private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
+  private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
+
+  // An entity that manages the shuffle secret per application
+  // This is used only if authentication is enabled
+  private ShuffleSecretManager secretManager;
+
+  // The actual server that serves shuffle files
+  private TransportServer shuffleServer = null;
+
+  // Handles registering executors and opening shuffle blocks
+  @VisibleForTesting
+  ExternalShuffleBlockHandler blockHandler;
+
+  // Where to store & reload executor info for recovering state after an NM restart
+  @VisibleForTesting
+  File registeredExecutorFile;
+
+  // just for testing when you want to find an open port
+  @VisibleForTesting
+  static int boundPort = -1;
+
+  // just for integration tests that want to look at this file -- in general not sensible as
+  // a static
+  @VisibleForTesting
+  static YarnShuffleService instance;
+
+  public YarnShuffleService() {
+    super("spark_shuffle");
+    logger.info("Initializing YARN shuffle service for Spark");
+    instance = this;
+  }
+
+  /**
+   * Return whether authentication is enabled as specified by the configuration.
+   * If so, fetch requests will fail unless the appropriate authentication secret
+   * for the application is provided.
+   */
+  private boolean isAuthenticationEnabled() {
+    return secretManager != null;
+  }
+
+  /**
+   * Start the shuffle server with the given configuration.
+   */
+  @Override
+  protected void serviceInit(Configuration conf) {
+
+    // In case this NM was killed while there were running spark applications, we need to restore
+    // lost state for the existing executors.  We look for an existing file in the NM's local dirs.
+    // If we don't find one, then we choose a file to use to save the state next time.  Even if
+    // an application was stopped while the NM was down, we expect yarn to call stopApplication()
+    // when it comes back
+    registeredExecutorFile =
+      findRegisteredExecutorFile(conf.getStrings("yarn.nodemanager.local-dirs"));
+
+    TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
+    // If authentication is enabled, set up the shuffle server to use a
+    // special RPC handler that filters out unauthenticated fetch requests
+    boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);
+    try {
+      blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
+    } catch (Exception e) {
+      logger.error("Failed to initialize external shuffle service", e);
+    }
+
+    List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
+    if (authEnabled) {
+      secretManager = new ShuffleSecretManager();
+      bootstraps.add(new SaslServerBootstrap(transportConf, secretManager));
+    }
+
+    int port = conf.getInt(
+      SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT);
+    TransportContext transportContext = new TransportContext(transportConf, blockHandler);
+    shuffleServer = transportContext.createServer(port, bootstraps);
+    // the port should normally be fixed, but for tests its useful to find an open port
+    port = shuffleServer.getPort();
+    boundPort = port;
+    String authEnabledString = authEnabled ? "enabled" : "not enabled";
+    logger.info("Started YARN shuffle service for Spark on port {}. " +
+      "Authentication is {}.  Registered executor file is {}", port, authEnabledString,
+      registeredExecutorFile);
+  }
+
+  @Override
+  public void initializeApplication(ApplicationInitializationContext context) {
+    String appId = context.getApplicationId().toString();
+    try {
+      ByteBuffer shuffleSecret = context.getApplicationDataForService();
+      logger.info("Initializing application {}", appId);
+      if (isAuthenticationEnabled()) {
+        secretManager.registerApp(appId, shuffleSecret);
+      }
+    } catch (Exception e) {
+      logger.error("Exception when initializing application {}", appId, e);
+    }
+  }
+
+  @Override
+  public void stopApplication(ApplicationTerminationContext context) {
+    String appId = context.getApplicationId().toString();
+    try {
+      logger.info("Stopping application {}", appId);
+      if (isAuthenticationEnabled()) {
+        secretManager.unregisterApp(appId);
+      }
+      blockHandler.applicationRemoved(appId, false /* clean up local dirs */);
+    } catch (Exception e) {
+      logger.error("Exception when stopping application {}", appId, e);
+    }
+  }
+
+  @Override
+  public void initializeContainer(ContainerInitializationContext context) {
+    ContainerId containerId = context.getContainerId();
+    logger.info("Initializing container {}", containerId);
+  }
+
+  @Override
+  public void stopContainer(ContainerTerminationContext context) {
+    ContainerId containerId = context.getContainerId();
+    logger.info("Stopping container {}", containerId);
+  }
+
+  private File findRegisteredExecutorFile(String[] localDirs) {
+    for (String dir: localDirs) {
+      File f = new File(dir, "registeredExecutors.ldb");
+      if (f.exists()) {
+        return f;
+      }
+    }
+    return new File(localDirs[0], "registeredExecutors.ldb");
+  }
+
+  /**
+   * Close the shuffle server to clean up any associated state.
+   */
+  @Override
+  protected void serviceStop() {
+    try {
+      if (shuffleServer != null) {
+        shuffleServer.close();
+      }
+      if (blockHandler != null) {
+        blockHandler.close();
+      }
+    } catch (Exception e) {
+      logger.error("Exception when stopping service", e);
+    }
+  }
+
+  // Not currently used
+  @Override
+  public ByteBuffer getMetaData() {
+    return ByteBuffer.allocate(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/common/network-yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java
----------------------------------------------------------------------
diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java
new file mode 100644
index 0000000..8848617
--- /dev/null
+++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/util/HadoopConfigProvider.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.yarn.util;
+
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.spark.network.util.ConfigProvider;
+
+/** Use the Hadoop configuration to obtain config values. */
+public class HadoopConfigProvider extends ConfigProvider {
+  private final Configuration conf;
+
+  public HadoopConfigProvider(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public String get(String name) {
+    String value = conf.get(name);
+    if (value == null) {
+      throw new NoSuchElementException(name);
+    }
+    return value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/dev/sparktestsupport/modules.py
----------------------------------------------------------------------
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 07c3078..4e04672 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -473,11 +473,11 @@ yarn = Module(
     dependencies=[],
     source_file_regexes=[
         "yarn/",
-        "network/yarn/",
+        "common/network-yarn/",
     ],
     sbt_test_goals=[
         "yarn/test",
-        "network-yarn/test",
+        "common/network-yarn/test",
     ],
     test_tags=[
         "org.apache.spark.tags.ExtendedYarnTest"

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/docs/job-scheduling.md
----------------------------------------------------------------------
diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md
index 00b6a18..083c020 100644
--- a/docs/job-scheduling.md
+++ b/docs/job-scheduling.md
@@ -88,7 +88,7 @@ In YARN mode, start the shuffle service on each `NodeManager` as follows:
 1. Build Spark with the [YARN profile](building-spark.html). Skip this step if you are using a
 pre-packaged distribution.
 2. Locate the `spark-<version>-yarn-shuffle.jar`. This should be under
-`$SPARK_HOME/network/yarn/target/scala-<version>` if you are building Spark yourself, and under
+`$SPARK_HOME/common/network-yarn/target/scala-<version>` if you are building Spark yourself, and under
 `lib` if you are using a distribution.
 2. Add this jar to the classpath of all `NodeManager`s in your cluster.
 3. In the `yarn-site.xml` on each node, add `spark_shuffle` to `yarn.nodemanager.aux-services`,

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
index 68af143..45c2c00 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
@@ -148,7 +148,8 @@ abstract class AbstractCommandBuilder {
       String scala = getScalaVersion();
       List<String> projects = Arrays.asList("core", "repl", "mllib", "graphx",
         "streaming", "tools", "sql/catalyst", "sql/core", "sql/hive", "sql/hive-thriftserver",
-        "yarn", "launcher", "network/common", "network/shuffle", "network/yarn");
+        "yarn", "launcher",
+        "common/network-common", "common/network-shuffle", "common/network-yarn");
       if (prependClasses) {
         if (!isTesting) {
           System.err.println(

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/make-distribution.sh
----------------------------------------------------------------------
diff --git a/make-distribution.sh b/make-distribution.sh
index 2099814..ac90ea3 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -169,7 +169,7 @@ cp "$SPARK_HOME"/assembly/target/scala*/*assembly*hadoop*.jar "$DISTDIR/lib/"
 cp "$SPARK_HOME"/examples/target/scala*/spark-examples*.jar "$DISTDIR/lib/"
 # This will fail if the -Pyarn profile is not provided
 # In this case, silence the error and ignore the return code of this command
-cp "$SPARK_HOME"/network/yarn/target/scala*/spark-*-yarn-shuffle.jar "$DISTDIR/lib/" &> /dev/null || :
+cp "$SPARK_HOME"/common/network-yarn/target/scala*/spark-*-yarn-shuffle.jar "$DISTDIR/lib/" &> /dev/null || :
 
 # Copy example sources (needed for python and SQL)
 mkdir -p "$DISTDIR/examples/src/main"

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/pom.xml
----------------------------------------------------------------------
diff --git a/network/common/pom.xml b/network/common/pom.xml
deleted file mode 100644
index bd507c2..0000000
--- a/network/common/pom.xml
+++ /dev/null
@@ -1,103 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.spark</groupId>
-    <artifactId>spark-parent_2.11</artifactId>
-    <version>2.0.0-SNAPSHOT</version>
-    <relativePath>../../pom.xml</relativePath>
-  </parent>
-
-  <groupId>org.apache.spark</groupId>
-  <artifactId>spark-network-common_2.11</artifactId>
-  <packaging>jar</packaging>
-  <name>Spark Project Networking</name>
-  <url>http://spark.apache.org/</url>
-  <properties>
-    <sbt.project.name>network-common</sbt.project.name>
-  </properties>
-
-  <dependencies>
-    <!-- Core dependencies -->
-    <dependency>
-      <groupId>io.netty</groupId>
-      <artifactId>netty-all</artifactId>
-    </dependency>
-
-    <!-- Provided dependencies -->
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.google.code.findbugs</groupId>
-      <artifactId>jsr305</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-      <scope>compile</scope>
-    </dependency>
-
-    <!-- Test dependencies -->
-    <dependency>
-      <groupId>log4j</groupId>
-      <artifactId>log4j</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-core</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
-    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
-    <plugins>
-      <!-- Create a test-jar so network-shuffle can depend on our test utilities. -->
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-jar-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>test-jar-on-test-compile</id>
-            <phase>test-compile</phase>
-            <goals>
-              <goal>test-jar</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-</project>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message