spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From van...@apache.org
Subject spark git commit: [SPARK-10745][CORE] Separate configs between shuffle and RPC
Date Wed, 18 Nov 2015 20:53:44 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 ad1445561 -> 34a776798


[SPARK-10745][CORE] Separate configs between shuffle and RPC

[SPARK-6028](https://issues.apache.org/jira/browse/SPARK-6028) uses network module to implement
RPC. However, there are some configurations named with `spark.shuffle` prefix in the network
module.

This PR refactors them to make sure the user can control them in shuffle and RPC separately.
The user can use `spark.rpc.*` to set the configuration for netty RPC.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #9481 from zsxwing/SPARK-10745.

(cherry picked from commit 7c5b641808740ba5eed05ba8204cdbaf3fc579f5)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34a77679
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34a77679
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34a77679

Branch: refs/heads/branch-1.6
Commit: 34a77679877bc40b58a10ec539a8da00fed7db39
Parents: ad14455
Author: Shixiong Zhu <shixiong@databricks.com>
Authored: Wed Nov 18 12:53:22 2015 -0800
Committer: Marcelo Vanzin <vanzin@cloudera.com>
Committed: Wed Nov 18 12:53:37 2015 -0800

----------------------------------------------------------------------
 .../spark/deploy/ExternalShuffleService.scala   |  3 +-
 .../netty/NettyBlockTransferService.scala       |  2 +-
 .../network/netty/SparkTransportConf.scala      | 12 ++--
 .../apache/spark/rpc/netty/NettyRpcEnv.scala    |  8 +--
 .../mesos/CoarseMesosSchedulerBackend.scala     |  2 +-
 .../shuffle/FileShuffleBlockResolver.scala      |  2 +-
 .../shuffle/IndexShuffleBlockResolver.scala     |  2 +-
 .../org/apache/spark/storage/BlockManager.scala |  2 +-
 .../spark/ExternalShuffleServiceSuite.scala     |  2 +-
 .../spark/network/util/TransportConf.java       | 65 +++++++++++++++-----
 .../network/ChunkFetchIntegrationSuite.java     |  2 +-
 .../network/RequestTimeoutIntegrationSuite.java |  2 +-
 .../spark/network/RpcIntegrationSuite.java      |  2 +-
 .../org/apache/spark/network/StreamSuite.java   |  2 +-
 .../network/TransportClientFactorySuite.java    |  6 +-
 .../spark/network/sasl/SparkSaslSuite.java      |  6 +-
 .../network/sasl/SaslIntegrationSuite.java      |  2 +-
 .../ExternalShuffleBlockResolverSuite.java      |  2 +-
 .../shuffle/ExternalShuffleCleanupSuite.java    |  2 +-
 .../ExternalShuffleIntegrationSuite.java        |  2 +-
 .../shuffle/ExternalShuffleSecuritySuite.java   |  2 +-
 .../shuffle/RetryingBlockFetcherSuite.java      |  2 +-
 .../spark/network/yarn/YarnShuffleService.java  |  2 +-
 23 files changed, 84 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index a039d54..e8a1e35 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -45,7 +45,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
   private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
   private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
 
-  private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores
= 0)
+  private val transportConf =
+    SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0)
   private val blockHandler = newShuffleBlockHandler(transportConf)
   private val transportContext: TransportContext =
     new TransportContext(transportConf, blockHandler, true)

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 70a42f9..b0694e3 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -41,7 +41,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
   // TODO: Don't use Java serialization, use a more cross-version compatible serialization
format.
   private val serializer = new JavaSerializer(conf)
   private val authEnabled = securityManager.isAuthenticationEnabled()
-  private val transportConf = SparkTransportConf.fromSparkConf(conf, numCores)
+  private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numCores)
 
   private[this] var transportContext: TransportContext = _
   private[this] var server: TransportServer = _

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
index cef2030..84833f5 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
@@ -40,23 +40,23 @@ object SparkTransportConf {
 
   /**
    * Utility for creating a [[TransportConf]] from a [[SparkConf]].
+   * @param _conf the [[SparkConf]]
+   * @param module the module name
    * @param numUsableCores if nonzero, this will restrict the server and client threads to
only
    *                       use the given number of cores, rather than all of the machine's
cores.
    *                       This restriction will only occur if these properties are not already
set.
    */
-  def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0): TransportConf = {
+  def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf
= {
     val conf = _conf.clone
 
     // Specify thread configuration based on our JVM's allocation of cores (rather than necessarily
     // assuming we have all the machine's cores).
     // NB: Only set if serverThreads/clientThreads not already set.
     val numThreads = defaultNumThreads(numUsableCores)
-    conf.set("spark.shuffle.io.serverThreads",
-      conf.get("spark.shuffle.io.serverThreads", numThreads.toString))
-    conf.set("spark.shuffle.io.clientThreads",
-      conf.get("spark.shuffle.io.clientThreads", numThreads.toString))
+    conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
+    conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)
 
-    new TransportConf(new ConfigProvider {
+    new TransportConf(module, new ConfigProvider {
       override def get(name: String): String = conf.get(name)
     })
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 0909381..3e0c497 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -22,16 +22,13 @@ import java.net.{InetSocketAddress, URI}
 import java.nio.ByteBuffer
 import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicBoolean
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy
+import javax.annotation.Nullable
 
-import scala.collection.mutable
 import scala.concurrent.{Future, Promise}
 import scala.reflect.ClassTag
 import scala.util.{DynamicVariable, Failure, Success}
 import scala.util.control.NonFatal
 
-import com.google.common.base.Preconditions
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.network.TransportContext
 import org.apache.spark.network.client._
@@ -49,7 +46,8 @@ private[netty] class NettyRpcEnv(
     securityManager: SecurityManager) extends RpcEnv(conf) with Logging {
 
   private val transportConf = SparkTransportConf.fromSparkConf(
-    conf.clone.set("spark.shuffle.io.numConnectionsPerPeer", "1"),
+    conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
+    "rpc",
     conf.getInt("spark.rpc.io.threads", 0))
 
   private val dispatcher: Dispatcher = new Dispatcher(this)

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 2de9b6a..7d08eae 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -109,7 +109,7 @@ private[spark] class CoarseMesosSchedulerBackend(
   private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
     if (shuffleServiceEnabled) {
       Some(new MesosExternalShuffleClient(
-        SparkTransportConf.fromSparkConf(conf),
+        SparkTransportConf.fromSparkConf(conf, "shuffle"),
         securityManager,
         securityManager.isAuthenticationEnabled(),
         securityManager.isSaslEncryptionEnabled()))

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
index 39fadd8..cc5f933 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala
@@ -46,7 +46,7 @@ private[spark] trait ShuffleWriterGroup {
 private[spark] class FileShuffleBlockResolver(conf: SparkConf)
   extends ShuffleBlockResolver with Logging {
 
-  private val transportConf = SparkTransportConf.fromSparkConf(conf)
+  private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
 
   private lazy val blockManager = SparkEnv.get.blockManager
 

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index 05b1eed..fadb8fe 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -47,7 +47,7 @@ private[spark] class IndexShuffleBlockResolver(
 
   private lazy val blockManager = Option(_blockManager).getOrElse(SparkEnv.get.blockManager)
 
-  private val transportConf = SparkTransportConf.fromSparkConf(conf)
+  private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
 
   def getDataFile(shuffleId: Int, mapId: Int): File = {
     blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 661c706..ab0007f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -122,7 +122,7 @@ private[spark] class BlockManager(
   // Client to read other executors' shuffle files. This is either an external service, or
just the
   // standard BlockTransferService to directly connect to other Executors.
   private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
-    val transConf = SparkTransportConf.fromSparkConf(conf, numUsableCores)
+    val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
     new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
       securityManager.isSaslEncryptionEnabled())
   } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 231f463..1c775bc 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -35,7 +35,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll
{
   var rpcHandler: ExternalShuffleBlockHandler = _
 
   override def beforeAll() {
-    val transportConf = SparkTransportConf.fromSparkConf(conf, numUsableCores = 2)
+    val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores
= 2)
     rpcHandler = new ExternalShuffleBlockHandler(transportConf, null)
     val transportContext = new TransportContext(transportConf, rpcHandler)
     server = transportContext.createServer()

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 3b2eff3..115135d 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -23,18 +23,53 @@ import com.google.common.primitives.Ints;
  * A central location that tracks all the settings we expose to users.
  */
 public class TransportConf {
+
+  private final String SPARK_NETWORK_IO_MODE_KEY;
+  private final String SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY;
+  private final String SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY;
+  private final String SPARK_NETWORK_IO_BACKLOG_KEY;
+  private final String SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY;
+  private final String SPARK_NETWORK_IO_SERVERTHREADS_KEY;
+  private final String SPARK_NETWORK_IO_CLIENTTHREADS_KEY;
+  private final String SPARK_NETWORK_IO_RECEIVEBUFFER_KEY;
+  private final String SPARK_NETWORK_IO_SENDBUFFER_KEY;
+  private final String SPARK_NETWORK_SASL_TIMEOUT_KEY;
+  private final String SPARK_NETWORK_IO_MAXRETRIES_KEY;
+  private final String SPARK_NETWORK_IO_RETRYWAIT_KEY;
+  private final String SPARK_NETWORK_IO_LAZYFD_KEY;
+
   private final ConfigProvider conf;
 
-  public TransportConf(ConfigProvider conf) {
+  private final String module;
+
+  public TransportConf(String module, ConfigProvider conf) {
+    this.module = module;
     this.conf = conf;
+    SPARK_NETWORK_IO_MODE_KEY = getConfKey("io.mode");
+    SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = getConfKey("io.preferDirectBufs");
+    SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = getConfKey("io.connectionTimeout");
+    SPARK_NETWORK_IO_BACKLOG_KEY = getConfKey("io.backLog");
+    SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY =  getConfKey("io.numConnectionsPerPeer");
+    SPARK_NETWORK_IO_SERVERTHREADS_KEY = getConfKey("io.serverThreads");
+    SPARK_NETWORK_IO_CLIENTTHREADS_KEY = getConfKey("io.clientThreads");
+    SPARK_NETWORK_IO_RECEIVEBUFFER_KEY = getConfKey("io.receiveBuffer");
+    SPARK_NETWORK_IO_SENDBUFFER_KEY = getConfKey("io.sendBuffer");
+    SPARK_NETWORK_SASL_TIMEOUT_KEY = getConfKey("sasl.timeout");
+    SPARK_NETWORK_IO_MAXRETRIES_KEY = getConfKey("io.maxRetries");
+    SPARK_NETWORK_IO_RETRYWAIT_KEY = getConfKey("io.retryWait");
+    SPARK_NETWORK_IO_LAZYFD_KEY = getConfKey("io.lazyFD");
+  }
+
+  private String getConfKey(String suffix) {
+    return "spark." + module + "." + suffix;
   }
 
   /** IO mode: nio or epoll */
-  public String ioMode() { return conf.get("spark.shuffle.io.mode", "NIO").toUpperCase();
}
+  public String ioMode() { return conf.get(SPARK_NETWORK_IO_MODE_KEY, "NIO").toUpperCase();
}
 
   /** If true, we will prefer allocating off-heap byte buffers within Netty. */
   public boolean preferDirectBufs() {
-    return conf.getBoolean("spark.shuffle.io.preferDirectBufs", true);
+    return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true);
   }
 
   /** Connect timeout in milliseconds. Default 120 secs. */
@@ -42,23 +77,23 @@ public class TransportConf {
     long defaultNetworkTimeoutS = JavaUtils.timeStringAsSec(
       conf.get("spark.network.timeout", "120s"));
     long defaultTimeoutMs = JavaUtils.timeStringAsSec(
-      conf.get("spark.shuffle.io.connectionTimeout", defaultNetworkTimeoutS + "s")) * 1000;
+      conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS + "s")) * 1000;
     return (int) defaultTimeoutMs;
   }
 
   /** Number of concurrent connections between two nodes for fetching data. */
   public int numConnectionsPerPeer() {
-    return conf.getInt("spark.shuffle.io.numConnectionsPerPeer", 1);
+    return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1);
   }
 
   /** Requested maximum length of the queue of incoming connections. Default -1 for no backlog.
*/
-  public int backLog() { return conf.getInt("spark.shuffle.io.backLog", -1); }
+  public int backLog() { return conf.getInt(SPARK_NETWORK_IO_BACKLOG_KEY, -1); }
 
   /** Number of threads used in the server thread pool. Default to 0, which is 2x#cores.
*/
-  public int serverThreads() { return conf.getInt("spark.shuffle.io.serverThreads", 0); }
+  public int serverThreads() { return conf.getInt(SPARK_NETWORK_IO_SERVERTHREADS_KEY, 0);
}
 
   /** Number of threads used in the client thread pool. Default to 0, which is 2x#cores.
*/
-  public int clientThreads() { return conf.getInt("spark.shuffle.io.clientThreads", 0); }
+  public int clientThreads() { return conf.getInt(SPARK_NETWORK_IO_CLIENTTHREADS_KEY, 0);
}
 
   /**
    * Receive buffer size (SO_RCVBUF).
@@ -67,28 +102,28 @@ public class TransportConf {
    * Assuming latency = 1ms, network_bandwidth = 10Gbps
    *  buffer size should be ~ 1.25MB
    */
-  public int receiveBuf() { return conf.getInt("spark.shuffle.io.receiveBuffer", -1); }
+  public int receiveBuf() { return conf.getInt(SPARK_NETWORK_IO_RECEIVEBUFFER_KEY, -1); }
 
   /** Send buffer size (SO_SNDBUF). */
-  public int sendBuf() { return conf.getInt("spark.shuffle.io.sendBuffer", -1); }
+  public int sendBuf() { return conf.getInt(SPARK_NETWORK_IO_SENDBUFFER_KEY, -1); }
 
   /** Timeout for a single round trip of SASL token exchange, in milliseconds. */
   public int saslRTTimeoutMs() {
-    return (int) JavaUtils.timeStringAsSec(conf.get("spark.shuffle.sasl.timeout", "30s"))
* 1000;
+    return (int) JavaUtils.timeStringAsSec(conf.get(SPARK_NETWORK_SASL_TIMEOUT_KEY, "30s"))
* 1000;
   }
 
   /**
    * Max number of times we will try IO exceptions (such as connection timeouts) per request.
    * If set to 0, we will not do any retries.
    */
-  public int maxIORetries() { return conf.getInt("spark.shuffle.io.maxRetries", 3); }
+  public int maxIORetries() { return conf.getInt(SPARK_NETWORK_IO_MAXRETRIES_KEY, 3); }
 
   /**
    * Time (in milliseconds) that we will wait in order to perform a retry after an IOException.
    * Only relevant if maxIORetries &gt; 0.
    */
   public int ioRetryWaitTimeMs() {
-    return (int) JavaUtils.timeStringAsSec(conf.get("spark.shuffle.io.retryWait", "5s"))
* 1000;
+    return (int) JavaUtils.timeStringAsSec(conf.get(SPARK_NETWORK_IO_RETRYWAIT_KEY, "5s"))
* 1000;
   }
 
   /**
@@ -101,11 +136,11 @@ public class TransportConf {
   }
 
   /**
-   * Whether to initialize shuffle FileDescriptor lazily or not. If true, file descriptors
are
+   * Whether to initialize FileDescriptor lazily or not. If true, file descriptors are
    * created only when data is going to be transferred. This can reduce the number of open
files.
    */
   public boolean lazyFileDescriptor() {
-    return conf.getBoolean("spark.shuffle.io.lazyFD", true);
+    return conf.getBoolean(SPARK_NETWORK_IO_LAZYFD_KEY, true);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
index dfb7740..dc5fa1c 100644
--- a/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/ChunkFetchIntegrationSuite.java
@@ -83,7 +83,7 @@ public class ChunkFetchIntegrationSuite {
     fp.write(fileContent);
     fp.close();
 
-    final TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+    final TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
     fileChunk = new FileSegmentManagedBuffer(conf, testFile, 10, testFile.length() - 25);
 
     streamManager = new StreamManager() {

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/network/common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/network/common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
b/network/common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
index 84ebb33..42955ef 100644
--- a/network/common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/RequestTimeoutIntegrationSuite.java
@@ -60,7 +60,7 @@ public class RequestTimeoutIntegrationSuite {
   public void setUp() throws Exception {
     Map<String, String> configMap = Maps.newHashMap();
     configMap.put("spark.shuffle.io.connectionTimeout", "2s");
-    conf = new TransportConf(new MapConfigProvider(configMap));
+    conf = new TransportConf("shuffle", new MapConfigProvider(configMap));
 
     defaultManager = new StreamManager() {
       @Override

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
b/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
index 64b457b..8eb56bd 100644
--- a/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
@@ -49,7 +49,7 @@ public class RpcIntegrationSuite {
 
   @BeforeClass
   public static void setUp() throws Exception {
-    TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+    TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
     rpcHandler = new RpcHandler() {
       @Override
       public void receive(TransportClient client, byte[] message, RpcResponseCallback callback)
{

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/network/common/src/test/java/org/apache/spark/network/StreamSuite.java
----------------------------------------------------------------------
diff --git a/network/common/src/test/java/org/apache/spark/network/StreamSuite.java b/network/common/src/test/java/org/apache/spark/network/StreamSuite.java
index 6dcec83..00158fd 100644
--- a/network/common/src/test/java/org/apache/spark/network/StreamSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/StreamSuite.java
@@ -89,7 +89,7 @@ public class StreamSuite {
       fp.close();
     }
 
-    final TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+    final TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
     final StreamManager streamManager = new StreamManager() {
       @Override
       public ManagedBuffer getChunk(long streamId, int chunkIndex) {

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
----------------------------------------------------------------------
diff --git a/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
b/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
index f447137..dac7d4a 100644
--- a/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
@@ -52,7 +52,7 @@ public class TransportClientFactorySuite {
 
   @Before
   public void setUp() {
-    conf = new TransportConf(new SystemPropertyConfigProvider());
+    conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
     RpcHandler rpcHandler = new NoOpRpcHandler();
     context = new TransportContext(conf, rpcHandler);
     server1 = context.createServer();
@@ -76,7 +76,7 @@ public class TransportClientFactorySuite {
 
     Map<String, String> configMap = Maps.newHashMap();
     configMap.put("spark.shuffle.io.numConnectionsPerPeer", Integer.toString(maxConnections));
-    TransportConf conf = new TransportConf(new MapConfigProvider(configMap));
+    TransportConf conf = new TransportConf("shuffle", new MapConfigProvider(configMap));
 
     RpcHandler rpcHandler = new NoOpRpcHandler();
     TransportContext context = new TransportContext(conf, rpcHandler);
@@ -182,7 +182,7 @@ public class TransportClientFactorySuite {
 
   @Test
   public void closeIdleConnectionForRequestTimeOut() throws IOException, InterruptedException
{
-    TransportConf conf = new TransportConf(new ConfigProvider() {
+    TransportConf conf = new TransportConf("shuffle", new ConfigProvider() {
 
       @Override
       public String get(String name) {

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
----------------------------------------------------------------------
diff --git a/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
b/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
index 3469e84..b146899 100644
--- a/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
@@ -207,7 +207,7 @@ public class SparkSaslSuite {
   public void testEncryptedMessageChunking() throws Exception {
     File file = File.createTempFile("sasltest", ".txt");
     try {
-      TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+      TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
 
       byte[] data = new byte[8 * 1024];
       new Random().nextBytes(data);
@@ -242,7 +242,7 @@ public class SparkSaslSuite {
     final File file = File.createTempFile("sasltest", ".txt");
     SaslTestCtx ctx = null;
     try {
-      final TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+      final TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
       StreamManager sm = mock(StreamManager.class);
       when(sm.getChunk(anyLong(), anyInt())).thenAnswer(new Answer<ManagedBuffer>()
{
           @Override
@@ -368,7 +368,7 @@ public class SparkSaslSuite {
         boolean disableClientEncryption)
       throws Exception {
 
-      TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+      TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
 
       SecretKeyHolder keyHolder = mock(SecretKeyHolder.class);
       when(keyHolder.getSaslUser(anyString())).thenReturn("user");

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
b/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
index c393a5e..1c2fa4d 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
@@ -70,7 +70,7 @@ public class SaslIntegrationSuite {
 
   @BeforeClass
   public static void beforeAll() throws IOException {
-    conf = new TransportConf(new SystemPropertyConfigProvider());
+    conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
     context = new TransportContext(conf, new TestRpcHandler());
 
     secretKeyHolder = mock(SecretKeyHolder.class);

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
index 3c6cb36..a995823 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java
@@ -42,7 +42,7 @@ public class ExternalShuffleBlockResolverSuite {
 
   static TestShuffleDataContext dataContext;
 
-  static TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+  static TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
 
   @BeforeClass
   public static void beforeAll() throws IOException {

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
index 2f4f1d0..532d7ab 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -35,7 +35,7 @@ public class ExternalShuffleCleanupSuite {
 
   // Same-thread Executor used to ensure cleanup happens synchronously in test thread.
   Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
-  TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+  TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
 
   @Test
   public void noCleanupAndCleanup() throws IOException {

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index a3f9a38..2095f41 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -91,7 +91,7 @@ public class ExternalShuffleIntegrationSuite {
     dataContext1.create();
     dataContext1.insertHashShuffleData(1, 0, exec1Blocks);
 
-    conf = new TransportConf(new SystemPropertyConfigProvider());
+    conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
     handler = new ExternalShuffleBlockHandler(conf, null);
     TransportContext transportContext = new TransportContext(conf, handler);
     server = transportContext.createServer();

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
index aa99efd..08ddb37 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
@@ -39,7 +39,7 @@ import org.apache.spark.network.util.TransportConf;
 
 public class ExternalShuffleSecuritySuite {
 
-  TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+  TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
   TransportServer server;
 
   @Before

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
index 06e46f9..3a6ef0d 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
@@ -254,7 +254,7 @@ public class RetryingBlockFetcherSuite {
                                           BlockFetchingListener listener)
     throws IOException {
 
-    TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
+    TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
     BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class);
 
     Stubber stub = null;

http://git-wip-us.apache.org/repos/asf/spark/blob/34a77679/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
----------------------------------------------------------------------
diff --git a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index 11ea7f3..ba6d30a 100644
--- a/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/network/yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -120,7 +120,7 @@ public class YarnShuffleService extends AuxiliaryService {
     registeredExecutorFile =
       findRegisteredExecutorFile(conf.getStrings("yarn.nodemanager.local-dirs"));
 
-    TransportConf transportConf = new TransportConf(new HadoopConfigProvider(conf));
+    TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
     // If authentication is enabled, set up the shuffle server to use a
     // special RPC handler that filters out unauthenticated fetch requests
     boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message