spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [05/14] spark git commit: [SPARK-13529][BUILD] Move network/* modules into common/network-*
Date Mon, 29 Feb 2016 01:25:21 GMT
http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java b/network/common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java
deleted file mode 100644
index 431cb67..0000000
--- a/network/common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.sasl;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.RealmCallback;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-import java.io.IOException;
-import java.util.Map;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import io.netty.buffer.Unpooled;
-import io.netty.handler.codec.base64.Base64;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A SASL Server for Spark which simply keeps track of the state of a single SASL session, from the
- * initial state to the "authenticated" state. (It is not a server in the sense of accepting
- * connections on some socket.)
- */
-public class SparkSaslServer implements SaslEncryptionBackend {
-  private final Logger logger = LoggerFactory.getLogger(SparkSaslServer.class);
-
-  /**
-   * This is passed as the server name when creating the sasl client/server.
-   * This could be changed to be configurable in the future.
-   */
-  static final String DEFAULT_REALM = "default";
-
-  /**
-   * The authentication mechanism used here is DIGEST-MD5. This could be changed to be
-   * configurable in the future.
-   */
-  static final String DIGEST = "DIGEST-MD5";
-
-  /**
-   * Quality of protection value that includes encryption.
-   */
-  static final String QOP_AUTH_CONF = "auth-conf";
-
-  /**
-   * Quality of protection value that does not include encryption.
-   */
-  static final String QOP_AUTH = "auth";
-
-  /** Identifier for a certain secret key within the secretKeyHolder. */
-  private final String secretKeyId;
-  private final SecretKeyHolder secretKeyHolder;
-  private SaslServer saslServer;
-
-  public SparkSaslServer(
-      String secretKeyId,
-      SecretKeyHolder secretKeyHolder,
-      boolean alwaysEncrypt) {
-    this.secretKeyId = secretKeyId;
-    this.secretKeyHolder = secretKeyHolder;
-
-    // Sasl.QOP is a comma-separated list of supported values. The value that allows encryption
-    // is listed first since it's preferred over the non-encrypted one (if the client also
-    // lists both in the request).
-    String qop = alwaysEncrypt ? QOP_AUTH_CONF : String.format("%s,%s", QOP_AUTH_CONF, QOP_AUTH);
-    Map<String, String> saslProps = ImmutableMap.<String, String>builder()
-      .put(Sasl.SERVER_AUTH, "true")
-      .put(Sasl.QOP, qop)
-      .build();
-    try {
-      this.saslServer = Sasl.createSaslServer(DIGEST, null, DEFAULT_REALM, saslProps,
-        new DigestCallbackHandler());
-    } catch (SaslException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  /**
-   * Determines whether the authentication exchange has completed successfully.
-   */
-  public synchronized boolean isComplete() {
-    return saslServer != null && saslServer.isComplete();
-  }
-
-  /** Returns the value of a negotiated property. */
-  public Object getNegotiatedProperty(String name) {
-    return saslServer.getNegotiatedProperty(name);
-  }
-
-  /**
-   * Used to respond to server SASL tokens.
-   * @param token Server's SASL token
-   * @return response to send back to the server.
-   */
-  public synchronized byte[] response(byte[] token) {
-    try {
-      return saslServer != null ? saslServer.evaluateResponse(token) : new byte[0];
-    } catch (SaslException e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  /**
-   * Disposes of any system resources or security-sensitive information the
-   * SaslServer might be using.
-   */
-  @Override
-  public synchronized void dispose() {
-    if (saslServer != null) {
-      try {
-        saslServer.dispose();
-      } catch (SaslException e) {
-        // ignore
-      } finally {
-        saslServer = null;
-      }
-    }
-  }
-
-  @Override
-  public byte[] wrap(byte[] data, int offset, int len) throws SaslException {
-    return saslServer.wrap(data, offset, len);
-  }
-
-  @Override
-  public byte[] unwrap(byte[] data, int offset, int len) throws SaslException {
-    return saslServer.unwrap(data, offset, len);
-  }
-
-  /**
-   * Implementation of javax.security.auth.callback.CallbackHandler for SASL DIGEST-MD5 mechanism.
-   */
-  private class DigestCallbackHandler implements CallbackHandler {
-    @Override
-    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
-      for (Callback callback : callbacks) {
-        if (callback instanceof NameCallback) {
-          logger.trace("SASL server callback: setting username");
-          NameCallback nc = (NameCallback) callback;
-          nc.setName(encodeIdentifier(secretKeyHolder.getSaslUser(secretKeyId)));
-        } else if (callback instanceof PasswordCallback) {
-          logger.trace("SASL server callback: setting password");
-          PasswordCallback pc = (PasswordCallback) callback;
-          pc.setPassword(encodePassword(secretKeyHolder.getSecretKey(secretKeyId)));
-        } else if (callback instanceof RealmCallback) {
-          logger.trace("SASL server callback: setting realm");
-          RealmCallback rc = (RealmCallback) callback;
-          rc.setText(rc.getDefaultText());
-        } else if (callback instanceof AuthorizeCallback) {
-          AuthorizeCallback ac = (AuthorizeCallback) callback;
-          String authId = ac.getAuthenticationID();
-          String authzId = ac.getAuthorizationID();
-          ac.setAuthorized(authId.equals(authzId));
-          if (ac.isAuthorized()) {
-            ac.setAuthorizedID(authzId);
-          }
-          logger.debug("SASL Authorization complete, authorized set to {}", ac.isAuthorized());
-        } else {
-          throw new UnsupportedCallbackException(callback, "Unrecognized SASL DIGEST-MD5 Callback");
-        }
-      }
-    }
-  }
-
-  /* Encode a byte[] identifier as a Base64-encoded string. */
-  public static String encodeIdentifier(String identifier) {
-    Preconditions.checkNotNull(identifier, "User cannot be null if SASL is enabled");
-    return Base64.encode(Unpooled.wrappedBuffer(identifier.getBytes(Charsets.UTF_8)))
-      .toString(Charsets.UTF_8);
-  }
-
-  /** Encode a password as a base64-encoded char[] array. */
-  public static char[] encodePassword(String password) {
-    Preconditions.checkNotNull(password, "Password cannot be null if SASL is enabled");
-    return Base64.encode(Unpooled.wrappedBuffer(password.getBytes(Charsets.UTF_8)))
-      .toString(Charsets.UTF_8).toCharArray();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/server/MessageHandler.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/MessageHandler.java b/network/common/src/main/java/org/apache/spark/network/server/MessageHandler.java
deleted file mode 100644
index 4a1f28e..0000000
--- a/network/common/src/main/java/org/apache/spark/network/server/MessageHandler.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.server;
-
-import org.apache.spark.network.protocol.Message;
-
-/**
- * Handles either request or response messages coming off of Netty. A MessageHandler instance
- * is associated with a single Netty Channel (though it may have multiple clients on the same
- * Channel.)
- */
-public abstract class MessageHandler<T extends Message> {
-  /** Handles the receipt of a single message. */
-  public abstract void handle(T message) throws Exception;
-
-  /** Invoked when the channel this MessageHandler is on is active. */
-  public abstract void channelActive();
-
-  /** Invoked when an exception was caught on the Channel. */
-  public abstract void exceptionCaught(Throwable cause);
-
-  /** Invoked when the channel this MessageHandler is on is inactive. */
-  public abstract void channelInactive();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/server/NoOpRpcHandler.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/NoOpRpcHandler.java b/network/common/src/main/java/org/apache/spark/network/server/NoOpRpcHandler.java
deleted file mode 100644
index 6ed61da..0000000
--- a/network/common/src/main/java/org/apache/spark/network/server/NoOpRpcHandler.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.server;
-
-import java.nio.ByteBuffer;
-
-import org.apache.spark.network.client.RpcResponseCallback;
-import org.apache.spark.network.client.TransportClient;
-
-/** An RpcHandler suitable for a client-only TransportContext, which cannot receive RPCs. */
-public class NoOpRpcHandler extends RpcHandler {
-  private final StreamManager streamManager;
-
-  public NoOpRpcHandler() {
-    streamManager = new OneForOneStreamManager();
-  }
-
-  @Override
-  public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
-    throw new UnsupportedOperationException("Cannot handle messages");
-  }
-
-  @Override
-  public StreamManager getStreamManager() { return streamManager; }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
deleted file mode 100644
index ea9e735..0000000
--- a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.server;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.base.Preconditions;
-import io.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.network.buffer.ManagedBuffer;
-import org.apache.spark.network.client.TransportClient;
-
-/**
- * StreamManager which allows registration of an Iterator&lt;ManagedBuffer&gt;, which are individually
- * fetched as chunks by the client. Each registered buffer is one chunk.
- */
-public class OneForOneStreamManager extends StreamManager {
-  private final Logger logger = LoggerFactory.getLogger(OneForOneStreamManager.class);
-
-  private final AtomicLong nextStreamId;
-  private final ConcurrentHashMap<Long, StreamState> streams;
-
-  /** State of a single stream. */
-  private static class StreamState {
-    final String appId;
-    final Iterator<ManagedBuffer> buffers;
-
-    // The channel associated to the stream
-    Channel associatedChannel = null;
-
-    // Used to keep track of the index of the buffer that the user has retrieved, just to ensure
-    // that the caller only requests each chunk one at a time, in order.
-    int curChunk = 0;
-
-    StreamState(String appId, Iterator<ManagedBuffer> buffers) {
-      this.appId = appId;
-      this.buffers = Preconditions.checkNotNull(buffers);
-    }
-  }
-
-  public OneForOneStreamManager() {
-    // For debugging purposes, start with a random stream id to help identifying different streams.
-    // This does not need to be globally unique, only unique to this class.
-    nextStreamId = new AtomicLong((long) new Random().nextInt(Integer.MAX_VALUE) * 1000);
-    streams = new ConcurrentHashMap<Long, StreamState>();
-  }
-
-  @Override
-  public void registerChannel(Channel channel, long streamId) {
-    if (streams.containsKey(streamId)) {
-      streams.get(streamId).associatedChannel = channel;
-    }
-  }
-
-  @Override
-  public ManagedBuffer getChunk(long streamId, int chunkIndex) {
-    StreamState state = streams.get(streamId);
-    if (chunkIndex != state.curChunk) {
-      throw new IllegalStateException(String.format(
-        "Received out-of-order chunk index %s (expected %s)", chunkIndex, state.curChunk));
-    } else if (!state.buffers.hasNext()) {
-      throw new IllegalStateException(String.format(
-        "Requested chunk index beyond end %s", chunkIndex));
-    }
-    state.curChunk += 1;
-    ManagedBuffer nextChunk = state.buffers.next();
-
-    if (!state.buffers.hasNext()) {
-      logger.trace("Removing stream id {}", streamId);
-      streams.remove(streamId);
-    }
-
-    return nextChunk;
-  }
-
-  @Override
-  public void connectionTerminated(Channel channel) {
-    // Close all streams which have been associated with the channel.
-    for (Map.Entry<Long, StreamState> entry: streams.entrySet()) {
-      StreamState state = entry.getValue();
-      if (state.associatedChannel == channel) {
-        streams.remove(entry.getKey());
-
-        // Release all remaining buffers.
-        while (state.buffers.hasNext()) {
-          state.buffers.next().release();
-        }
-      }
-    }
-  }
-
-  @Override
-  public void checkAuthorization(TransportClient client, long streamId) {
-    if (client.getClientId() != null) {
-      StreamState state = streams.get(streamId);
-      Preconditions.checkArgument(state != null, "Unknown stream ID.");
-      if (!client.getClientId().equals(state.appId)) {
-        throw new SecurityException(String.format(
-          "Client %s not authorized to read stream %d (app %s).",
-          client.getClientId(),
-          streamId,
-          state.appId));
-      }
-    }
-  }
-
-  /**
-   * Registers a stream of ManagedBuffers which are served as individual chunks one at a time to
-   * callers. Each ManagedBuffer will be release()'d after it is transferred on the wire. If a
-   * client connection is closed before the iterator is fully drained, then the remaining buffers
-   * will all be release()'d.
-   *
-   * If an app ID is provided, only callers who've authenticated with the given app ID will be
-   * allowed to fetch from this stream.
-   */
-  public long registerStream(String appId, Iterator<ManagedBuffer> buffers) {
-    long myStreamId = nextStreamId.getAndIncrement();
-    streams.put(myStreamId, new StreamState(appId, buffers));
-    return myStreamId;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java b/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java
deleted file mode 100644
index a99c301..0000000
--- a/network/common/src/main/java/org/apache/spark/network/server/RpcHandler.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.server;
-
-import java.nio.ByteBuffer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.network.client.RpcResponseCallback;
-import org.apache.spark.network.client.TransportClient;
-
-/**
- * Handler for sendRPC() messages sent by {@link org.apache.spark.network.client.TransportClient}s.
- */
-public abstract class RpcHandler {
-
-  private static final RpcResponseCallback ONE_WAY_CALLBACK = new OneWayRpcCallback();
-
-  /**
-   * Receive a single RPC message. Any exception thrown while in this method will be sent back to
-   * the client in string form as a standard RPC failure.
-   *
-   * This method will not be called in parallel for a single TransportClient (i.e., channel).
-   *
-   * @param client A channel client which enables the handler to make requests back to the sender
-   *               of this RPC. This will always be the exact same object for a particular channel.
-   * @param message The serialized bytes of the RPC.
-   * @param callback Callback which should be invoked exactly once upon success or failure of the
-   *                 RPC.
-   */
-  public abstract void receive(
-      TransportClient client,
-      ByteBuffer message,
-      RpcResponseCallback callback);
-
-  /**
-   * Returns the StreamManager which contains the state about which streams are currently being
-   * fetched by a TransportClient.
-   */
-  public abstract StreamManager getStreamManager();
-
-  /**
-   * Receives an RPC message that does not expect a reply. The default implementation will
-   * call "{@link #receive(TransportClient, ByteBuffer, RpcResponseCallback)}" and log a warning if
-   * any of the callback methods are called.
-   *
-   * @param client A channel client which enables the handler to make requests back to the sender
-   *               of this RPC. This will always be the exact same object for a particular channel.
-   * @param message The serialized bytes of the RPC.
-   */
-  public void receive(TransportClient client, ByteBuffer message) {
-    receive(client, message, ONE_WAY_CALLBACK);
-  }
-
-  /**
-   * Invoked when the channel associated with the given client is active.
-   */
-  public void channelActive(TransportClient client) { }
-
-  /**
-   * Invoked when the channel associated with the given client is inactive.
-   * No further requests will come from this client.
-   */
-  public void channelInactive(TransportClient client) { }
-
-  public void exceptionCaught(Throwable cause, TransportClient client) { }
-
-  private static class OneWayRpcCallback implements RpcResponseCallback {
-
-    private final Logger logger = LoggerFactory.getLogger(OneWayRpcCallback.class);
-
-    @Override
-    public void onSuccess(ByteBuffer response) {
-      logger.warn("Response provided for one-way RPC.");
-    }
-
-    @Override
-    public void onFailure(Throwable e) {
-      logger.error("Error response provided for one-way RPC.", e);
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
deleted file mode 100644
index 07f161a..0000000
--- a/network/common/src/main/java/org/apache/spark/network/server/StreamManager.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.server;
-
-import io.netty.channel.Channel;
-
-import org.apache.spark.network.buffer.ManagedBuffer;
-import org.apache.spark.network.client.TransportClient;
-
-/**
- * The StreamManager is used to fetch individual chunks from a stream. This is used in
- * {@link TransportRequestHandler} in order to respond to fetchChunk() requests. Creation of the
- * stream is outside the scope of the transport layer, but a given stream is guaranteed to be read
- * by only one client connection, meaning that getChunk() for a particular stream will be called
- * serially and that once the connection associated with the stream is closed, that stream will
- * never be used again.
- */
-public abstract class StreamManager {
-  /**
-   * Called in response to a fetchChunk() request. The returned buffer will be passed as-is to the
-   * client. A single stream will be associated with a single TCP connection, so this method
-   * will not be called in parallel for a particular stream.
-   *
-   * Chunks may be requested in any order, and requests may be repeated, but it is not required
-   * that implementations support this behavior.
-   *
-   * The returned ManagedBuffer will be release()'d after being written to the network.
-   *
-   * @param streamId id of a stream that has been previously registered with the StreamManager.
-   * @param chunkIndex 0-indexed chunk of the stream that's requested
-   */
-  public abstract ManagedBuffer getChunk(long streamId, int chunkIndex);
-
-  /**
-   * Called in response to a stream() request. The returned data is streamed to the client
-   * through a single TCP connection.
-   *
-   * Note the <code>streamId</code> argument is not related to the similarly named argument in the
-   * {@link #getChunk(long, int)} method.
-   *
-   * @param streamId id of a stream that has been previously registered with the StreamManager.
-   * @return A managed buffer for the stream, or null if the stream was not found.
-   */
-  public ManagedBuffer openStream(String streamId) {
-    throw new UnsupportedOperationException();
-  }
-
-  /**
-   * Associates a stream with a single client connection, which is guaranteed to be the only reader
-   * of the stream. The getChunk() method will be called serially on this connection and once the
-   * connection is closed, the stream will never be used again, enabling cleanup.
-   *
-   * This must be called before the first getChunk() on the stream, but it may be invoked multiple
-   * times with the same channel and stream id.
-   */
-  public void registerChannel(Channel channel, long streamId) { }
-
-  /**
-   * Indicates that the given channel has been terminated. After this occurs, we are guaranteed not
-   * to read from the associated streams again, so any state can be cleaned up.
-   */
-  public void connectionTerminated(Channel channel) { }
-
-  /**
-   * Verify that the client is authorized to read from the given stream.
-   *
-   * @throws SecurityException If client is not authorized.
-   */
-  public void checkAuthorization(TransportClient client, long streamId) { }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
deleted file mode 100644
index 18a9b78..0000000
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.server;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.timeout.IdleState;
-import io.netty.handler.timeout.IdleStateEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.network.client.TransportClient;
-import org.apache.spark.network.client.TransportResponseHandler;
-import org.apache.spark.network.protocol.Message;
-import org.apache.spark.network.protocol.RequestMessage;
-import org.apache.spark.network.protocol.ResponseMessage;
-import org.apache.spark.network.util.NettyUtils;
-
-/**
- * The single Transport-level Channel handler which is used for delegating requests to the
- * {@link TransportRequestHandler} and responses to the {@link TransportResponseHandler}.
- *
- * All channels created in the transport layer are bidirectional. When the Client initiates a Netty
- * Channel with a RequestMessage (which gets handled by the Server's RequestHandler), the Server
- * will produce a ResponseMessage (handled by the Client's ResponseHandler). However, the Server
- * also gets a handle on the same Channel, so it may then begin to send RequestMessages to the
- * Client.
- * This means that the Client also needs a RequestHandler and the Server needs a ResponseHandler,
- * for the Client's responses to the Server's requests.
- *
- * This class also handles timeouts from a {@link io.netty.handler.timeout.IdleStateHandler}.
- * We consider a connection timed out if there are outstanding fetch or RPC requests but no traffic
- * on the channel for at least `requestTimeoutMs`. Note that this is duplex traffic; we will not
- * timeout if the client is continuously sending but getting no responses, for simplicity.
- */
-public class TransportChannelHandler extends SimpleChannelInboundHandler<Message> {
-  private final Logger logger = LoggerFactory.getLogger(TransportChannelHandler.class);
-
-  private final TransportClient client;
-  private final TransportResponseHandler responseHandler;
-  private final TransportRequestHandler requestHandler;
-  private final long requestTimeoutNs;
-  private final boolean closeIdleConnections;
-
-  public TransportChannelHandler(
-      TransportClient client,
-      TransportResponseHandler responseHandler,
-      TransportRequestHandler requestHandler,
-      long requestTimeoutMs,
-      boolean closeIdleConnections) {
-    this.client = client;
-    this.responseHandler = responseHandler;
-    this.requestHandler = requestHandler;
-    this.requestTimeoutNs = requestTimeoutMs * 1000L * 1000;
-    this.closeIdleConnections = closeIdleConnections;
-  }
-
-  public TransportClient getClient() {
-    return client;
-  }
-
-  @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-    logger.warn("Exception in connection from " + NettyUtils.getRemoteAddress(ctx.channel()),
-      cause);
-    requestHandler.exceptionCaught(cause);
-    responseHandler.exceptionCaught(cause);
-    ctx.close();
-  }
-
-  @Override
-  public void channelActive(ChannelHandlerContext ctx) throws Exception {
-    try {
-      requestHandler.channelActive();
-    } catch (RuntimeException e) {
-      logger.error("Exception from request handler while registering channel", e);
-    }
-    try {
-      responseHandler.channelActive();
-    } catch (RuntimeException e) {
-      logger.error("Exception from response handler while registering channel", e);
-    }
-    super.channelRegistered(ctx);
-  }
-
-  @Override
-  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-    try {
-      requestHandler.channelInactive();
-    } catch (RuntimeException e) {
-      logger.error("Exception from request handler while unregistering channel", e);
-    }
-    try {
-      responseHandler.channelInactive();
-    } catch (RuntimeException e) {
-      logger.error("Exception from response handler while unregistering channel", e);
-    }
-    super.channelUnregistered(ctx);
-  }
-
-  @Override
-  public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception {
-    if (request instanceof RequestMessage) {
-      requestHandler.handle((RequestMessage) request);
-    } else {
-      responseHandler.handle((ResponseMessage) request);
-    }
-  }
-
-  /** Triggered based on events from an {@link io.netty.handler.timeout.IdleStateHandler}. */
-  @Override
-  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-    if (evt instanceof IdleStateEvent) {
-      IdleStateEvent e = (IdleStateEvent) evt;
-      // See class comment for timeout semantics. In addition to ensuring we only timeout while
-      // there are outstanding requests, we also do a secondary consistency check to ensure
-      // there's no race between the idle timeout and incrementing the numOutstandingRequests
-      // (see SPARK-7003).
-      //
-      // To avoid a race between TransportClientFactory.createClient() and this code which could
-      // result in an inactive client being returned, this needs to run in a synchronized block.
-      synchronized (this) {
-        boolean isActuallyOverdue =
-          System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;
-        if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) {
-          if (responseHandler.numOutstandingRequests() > 0) {
-            String address = NettyUtils.getRemoteAddress(ctx.channel());
-            logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
-              "requests. Assuming connection is dead; please adjust spark.network.timeout if this " +
-              "is wrong.", address, requestTimeoutNs / 1000 / 1000);
-            client.timeOut();
-            ctx.close();
-          } else if (closeIdleConnections) {
-            // While CloseIdleConnections is enable, we also close idle connection
-            client.timeOut();
-            ctx.close();
-          }
-        }
-      }
-    }
-    ctx.fireUserEventTriggered(evt);
-  }
-
-  public TransportResponseHandler getResponseHandler() {
-    return responseHandler;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
deleted file mode 100644
index 296ced3..0000000
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.server;
-
-import java.nio.ByteBuffer;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.network.buffer.ManagedBuffer;
-import org.apache.spark.network.buffer.NioManagedBuffer;
-import org.apache.spark.network.client.RpcResponseCallback;
-import org.apache.spark.network.client.TransportClient;
-import org.apache.spark.network.protocol.ChunkFetchRequest;
-import org.apache.spark.network.protocol.ChunkFetchFailure;
-import org.apache.spark.network.protocol.ChunkFetchSuccess;
-import org.apache.spark.network.protocol.Encodable;
-import org.apache.spark.network.protocol.OneWayMessage;
-import org.apache.spark.network.protocol.RequestMessage;
-import org.apache.spark.network.protocol.RpcFailure;
-import org.apache.spark.network.protocol.RpcRequest;
-import org.apache.spark.network.protocol.RpcResponse;
-import org.apache.spark.network.protocol.StreamFailure;
-import org.apache.spark.network.protocol.StreamRequest;
-import org.apache.spark.network.protocol.StreamResponse;
-import org.apache.spark.network.util.NettyUtils;
-
-/**
- * A handler that processes requests from clients and writes chunk data back. Each handler is
- * attached to a single Netty channel, and keeps track of which streams have been fetched via this
- * channel, in order to clean them up if the channel is terminated (see #channelUnregistered).
- *
- * The messages should have been processed by the pipeline setup by {@link TransportServer}.
- */
-public class TransportRequestHandler extends MessageHandler<RequestMessage> {
-  private final Logger logger = LoggerFactory.getLogger(TransportRequestHandler.class);
-
-  /** The Netty channel that this handler is associated with. */
-  private final Channel channel;
-
-  /** Client on the same channel allowing us to talk back to the requester. */
-  private final TransportClient reverseClient;
-
-  /** Handles all RPC messages. */
-  private final RpcHandler rpcHandler;
-
-  /** Returns each chunk part of a stream. */
-  private final StreamManager streamManager;
-
-  public TransportRequestHandler(
-      Channel channel,
-      TransportClient reverseClient,
-      RpcHandler rpcHandler) {
-    this.channel = channel;
-    this.reverseClient = reverseClient;
-    this.rpcHandler = rpcHandler;
-    this.streamManager = rpcHandler.getStreamManager();
-  }
-
-  @Override
-  public void exceptionCaught(Throwable cause) {
-    rpcHandler.exceptionCaught(cause, reverseClient);
-  }
-
-  @Override
-  public void channelActive() {
-    rpcHandler.channelActive(reverseClient);
-  }
-
-  @Override
-  public void channelInactive() {
-    if (streamManager != null) {
-      try {
-        streamManager.connectionTerminated(channel);
-      } catch (RuntimeException e) {
-        logger.error("StreamManager connectionTerminated() callback failed.", e);
-      }
-    }
-    rpcHandler.channelInactive(reverseClient);
-  }
-
-  @Override
-  public void handle(RequestMessage request) {
-    if (request instanceof ChunkFetchRequest) {
-      processFetchRequest((ChunkFetchRequest) request);
-    } else if (request instanceof RpcRequest) {
-      processRpcRequest((RpcRequest) request);
-    } else if (request instanceof OneWayMessage) {
-      processOneWayMessage((OneWayMessage) request);
-    } else if (request instanceof StreamRequest) {
-      processStreamRequest((StreamRequest) request);
-    } else {
-      throw new IllegalArgumentException("Unknown request type: " + request);
-    }
-  }
-
-  private void processFetchRequest(final ChunkFetchRequest req) {
-    final String client = NettyUtils.getRemoteAddress(channel);
-
-    logger.trace("Received req from {} to fetch block {}", client, req.streamChunkId);
-
-    ManagedBuffer buf;
-    try {
-      streamManager.checkAuthorization(reverseClient, req.streamChunkId.streamId);
-      streamManager.registerChannel(channel, req.streamChunkId.streamId);
-      buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
-    } catch (Exception e) {
-      logger.error(String.format(
-        "Error opening block %s for request from %s", req.streamChunkId, client), e);
-      respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e)));
-      return;
-    }
-
-    respond(new ChunkFetchSuccess(req.streamChunkId, buf));
-  }
-
-  private void processStreamRequest(final StreamRequest req) {
-    final String client = NettyUtils.getRemoteAddress(channel);
-    ManagedBuffer buf;
-    try {
-      buf = streamManager.openStream(req.streamId);
-    } catch (Exception e) {
-      logger.error(String.format(
-        "Error opening stream %s for request from %s", req.streamId, client), e);
-      respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e)));
-      return;
-    }
-
-    if (buf != null) {
-      respond(new StreamResponse(req.streamId, buf.size(), buf));
-    } else {
-      respond(new StreamFailure(req.streamId, String.format(
-        "Stream '%s' was not found.", req.streamId)));
-    }
-  }
-
-  private void processRpcRequest(final RpcRequest req) {
-    try {
-      rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
-        @Override
-        public void onSuccess(ByteBuffer response) {
-          respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
-        }
-
-        @Override
-        public void onFailure(Throwable e) {
-          respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
-        }
-      });
-    } catch (Exception e) {
-      logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
-      respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
-    } finally {
-      req.body().release();
-    }
-  }
-
-  private void processOneWayMessage(OneWayMessage req) {
-    try {
-      rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
-    } catch (Exception e) {
-      logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);
-    } finally {
-      req.body().release();
-    }
-  }
-
-  /**
-   * Responds to a single message with some Encodable object. If a failure occurs while sending,
-   * it will be logged and the channel closed.
-   */
-  private void respond(final Encodable result) {
-    final String remoteAddress = channel.remoteAddress().toString();
-    channel.writeAndFlush(result).addListener(
-      new ChannelFutureListener() {
-        @Override
-        public void operationComplete(ChannelFuture future) throws Exception {
-          if (future.isSuccess()) {
-            logger.trace(String.format("Sent result %s to client %s", result, remoteAddress));
-          } else {
-            logger.error(String.format("Error sending result %s to %s; closing connection",
-              result, remoteAddress), future.cause());
-            channel.close();
-          }
-        }
-      }
-    );
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
deleted file mode 100644
index baae235..0000000
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.server;
-
-import java.io.Closeable;
-import java.net.InetSocketAddress;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import org.apache.spark.network.util.JavaUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.spark.network.TransportContext;
-import org.apache.spark.network.util.IOMode;
-import org.apache.spark.network.util.NettyUtils;
-import org.apache.spark.network.util.TransportConf;
-
-/**
- * Server for the efficient, low-level streaming service.
- */
-public class TransportServer implements Closeable {
-  private final Logger logger = LoggerFactory.getLogger(TransportServer.class);
-
-  private final TransportContext context;
-  private final TransportConf conf;
-  private final RpcHandler appRpcHandler;
-  private final List<TransportServerBootstrap> bootstraps;
-
-  private ServerBootstrap bootstrap;
-  private ChannelFuture channelFuture;
-  private int port = -1;
-
-  /**
-   * Creates a TransportServer that binds to the given host and the given port, or to any available
-   * if 0. If you don't want to bind to any special host, set "hostToBind" to null.
-   * */
-  public TransportServer(
-      TransportContext context,
-      String hostToBind,
-      int portToBind,
-      RpcHandler appRpcHandler,
-      List<TransportServerBootstrap> bootstraps) {
-    this.context = context;
-    this.conf = context.getConf();
-    this.appRpcHandler = appRpcHandler;
-    this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
-
-    try {
-      init(hostToBind, portToBind);
-    } catch (RuntimeException e) {
-      JavaUtils.closeQuietly(this);
-      throw e;
-    }
-  }
-
-  public int getPort() {
-    if (port == -1) {
-      throw new IllegalStateException("Server not initialized");
-    }
-    return port;
-  }
-
-  private void init(String hostToBind, int portToBind) {
-
-    IOMode ioMode = IOMode.valueOf(conf.ioMode());
-    EventLoopGroup bossGroup =
-      NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
-    EventLoopGroup workerGroup = bossGroup;
-
-    PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
-      conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
-
-    bootstrap = new ServerBootstrap()
-      .group(bossGroup, workerGroup)
-      .channel(NettyUtils.getServerChannelClass(ioMode))
-      .option(ChannelOption.ALLOCATOR, allocator)
-      .childOption(ChannelOption.ALLOCATOR, allocator);
-
-    if (conf.backLog() > 0) {
-      bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
-    }
-
-    if (conf.receiveBuf() > 0) {
-      bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
-    }
-
-    if (conf.sendBuf() > 0) {
-      bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
-    }
-
-    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
-      @Override
-      protected void initChannel(SocketChannel ch) throws Exception {
-        RpcHandler rpcHandler = appRpcHandler;
-        for (TransportServerBootstrap bootstrap : bootstraps) {
-          rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
-        }
-        context.initializePipeline(ch, rpcHandler);
-      }
-    });
-
-    InetSocketAddress address = hostToBind == null ?
-        new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
-    channelFuture = bootstrap.bind(address);
-    channelFuture.syncUninterruptibly();
-
-    port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
-    logger.debug("Shuffle server started on port :" + port);
-  }
-
-  @Override
-  public void close() {
-    if (channelFuture != null) {
-      // close is a local operation and should finish within milliseconds; timeout just to be safe
-      channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS);
-      channelFuture = null;
-    }
-    if (bootstrap != null && bootstrap.group() != null) {
-      bootstrap.group().shutdownGracefully();
-    }
-    if (bootstrap != null && bootstrap.childGroup() != null) {
-      bootstrap.childGroup().shutdownGracefully();
-    }
-    bootstrap = null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/server/TransportServerBootstrap.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServerBootstrap.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServerBootstrap.java
deleted file mode 100644
index 05803ab..0000000
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportServerBootstrap.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.server;
-
-import io.netty.channel.Channel;
-
-/**
- * A bootstrap which is executed on a TransportServer's client channel once a client connects
- * to the server. This allows customizing the client channel to allow for things such as SASL
- * authentication.
- */
-public interface TransportServerBootstrap {
-  /**
-   * Customizes the channel to include new features, if needed.
-   *
-   * @param channel The connected channel opened by the client.
-   * @param rpcHandler The RPC handler for the server.
-   * @return The RPC handler to use for the channel.
-   */
-  RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/util/ByteArrayWritableChannel.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/ByteArrayWritableChannel.java b/network/common/src/main/java/org/apache/spark/network/util/ByteArrayWritableChannel.java
deleted file mode 100644
index b141572..0000000
--- a/network/common/src/main/java/org/apache/spark/network/util/ByteArrayWritableChannel.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.util;
-
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
-
-/**
- * A writable channel that stores the written data in a byte array in memory.
- */
-public class ByteArrayWritableChannel implements WritableByteChannel {
-
-  private final byte[] data;
-  private int offset;
-
-  public ByteArrayWritableChannel(int size) {
-    this.data = new byte[size];
-  }
-
-  public byte[] getData() {
-    return data;
-  }
-
-  public int length() {
-    return offset;
-  }
-
-  /** Resets the channel so that writing to it will overwrite the existing buffer. */
-  public void reset() {
-    offset = 0;
-  }
-
-  /**
-   * Reads from the given buffer into the internal byte array.
-   */
-  @Override
-  public int write(ByteBuffer src) {
-    int toTransfer = Math.min(src.remaining(), data.length - offset);
-    src.get(data, offset, toTransfer);
-    offset += toTransfer;
-    return toTransfer;
-  }
-
-  @Override
-  public void close() {
-
-  }
-
-  @Override
-  public boolean isOpen() {
-    return true;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/util/ByteUnit.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/ByteUnit.java b/network/common/src/main/java/org/apache/spark/network/util/ByteUnit.java
deleted file mode 100644
index a2f0183..0000000
--- a/network/common/src/main/java/org/apache/spark/network/util/ByteUnit.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.network.util;
-
-public enum ByteUnit {
-  BYTE (1),
-  KiB (1024L),
-  MiB ((long) Math.pow(1024L, 2L)),
-  GiB ((long) Math.pow(1024L, 3L)),
-  TiB ((long) Math.pow(1024L, 4L)),
-  PiB ((long) Math.pow(1024L, 5L));
-
-  private ByteUnit(long multiplier) {
-    this.multiplier = multiplier;
-  }
-
-  // Interpret the provided number (d) with suffix (u) as this unit type.
-  // E.g. KiB.interpret(1, MiB) interprets 1MiB as its KiB representation = 1024k
-  public long convertFrom(long d, ByteUnit u) {
-    return u.convertTo(d, this);
-  }
-
-  // Convert the provided number (d) interpreted as this unit type to unit type (u).
-  public long convertTo(long d, ByteUnit u) {
-    if (multiplier > u.multiplier) {
-      long ratio = multiplier / u.multiplier;
-      if (Long.MAX_VALUE / ratio < d) {
-        throw new IllegalArgumentException("Conversion of " + d + " exceeds Long.MAX_VALUE in "
-          + name() + ". Try a larger unit (e.g. MiB instead of KiB)");
-      }
-      return d * ratio;
-    } else {
-      // Perform operations in this order to avoid potential overflow
-      // when computing d * multiplier
-      return d / (u.multiplier / multiplier);
-    }
-  }
-
-  public double toBytes(long d) {
-    if (d < 0) {
-      throw new IllegalArgumentException("Negative size value. Size must be positive: " + d);
-    }
-    return d * multiplier;
-  }
-
-  public long toKiB(long d) { return convertTo(d, KiB); }
-  public long toMiB(long d) { return convertTo(d, MiB); }
-  public long toGiB(long d) { return convertTo(d, GiB); }
-  public long toTiB(long d) { return convertTo(d, TiB); }
-  public long toPiB(long d) { return convertTo(d, PiB); }
-
-  private final long multiplier;
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/util/ConfigProvider.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/ConfigProvider.java b/network/common/src/main/java/org/apache/spark/network/util/ConfigProvider.java
deleted file mode 100644
index d944d9d..0000000
--- a/network/common/src/main/java/org/apache/spark/network/util/ConfigProvider.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.util;
-
-import java.util.NoSuchElementException;
-
-/**
- * Provides a mechanism for constructing a {@link TransportConf} using some sort of configuration.
- */
-public abstract class ConfigProvider {
-  /** Obtains the value of the given config, throws NoSuchElementException if it doesn't exist. */
-  public abstract String get(String name);
-
-  public String get(String name, String defaultValue) {
-    try {
-      return get(name);
-    } catch (NoSuchElementException e) {
-      return defaultValue;
-    }
-  }
-
-  public int getInt(String name, int defaultValue) {
-    return Integer.parseInt(get(name, Integer.toString(defaultValue)));
-  }
-
-  public long getLong(String name, long defaultValue) {
-    return Long.parseLong(get(name, Long.toString(defaultValue)));
-  }
-
-  public double getDouble(String name, double defaultValue) {
-    return Double.parseDouble(get(name, Double.toString(defaultValue)));
-  }
-
-  public boolean getBoolean(String name, boolean defaultValue) {
-    return Boolean.parseBoolean(get(name, Boolean.toString(defaultValue)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/util/IOMode.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/IOMode.java b/network/common/src/main/java/org/apache/spark/network/util/IOMode.java
deleted file mode 100644
index 6b208d9..0000000
--- a/network/common/src/main/java/org/apache/spark/network/util/IOMode.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.util;
-
-/**
- * Selector for which form of low-level IO we should use.
- * NIO is always available, while EPOLL is only available on Linux.
- * AUTO is used to select EPOLL if it's available, or NIO otherwise.
- */
-public enum IOMode {
-  NIO, EPOLL
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
deleted file mode 100644
index b3d8e0c..0000000
--- a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.util;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import io.netty.buffer.Unpooled;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * General utilities available in the network package. Many of these are sourced from Spark's
- * own Utils, just accessible within this package.
- */
-public class JavaUtils {
-  private static final Logger logger = LoggerFactory.getLogger(JavaUtils.class);
-
-  /**
-   * Define a default value for driver memory here since this value is referenced across the code
-   * base and nearly all files already use Utils.scala
-   */
-  public static final long DEFAULT_DRIVER_MEM_MB = 1024;
-
-  /** Closes the given object, ignoring IOExceptions. */
-  public static void closeQuietly(Closeable closeable) {
-    try {
-      if (closeable != null) {
-        closeable.close();
-      }
-    } catch (IOException e) {
-      logger.error("IOException should not have been thrown.", e);
-    }
-  }
-
-  /** Returns a hash consistent with Spark's Utils.nonNegativeHash(). */
-  public static int nonNegativeHash(Object obj) {
-    if (obj == null) { return 0; }
-    int hash = obj.hashCode();
-    return hash != Integer.MIN_VALUE ? Math.abs(hash) : 0;
-  }
-
-  /**
-   * Convert the given string to a byte buffer. The resulting buffer can be
-   * converted back to the same string through {@link #bytesToString(ByteBuffer)}.
-   */
-  public static ByteBuffer stringToBytes(String s) {
-    return Unpooled.wrappedBuffer(s.getBytes(Charsets.UTF_8)).nioBuffer();
-  }
-
-  /**
-   * Convert the given byte buffer to a string. The resulting string can be
-   * converted back to the same byte buffer through {@link #stringToBytes(String)}.
-   */
-  public static String bytesToString(ByteBuffer b) {
-    return Unpooled.wrappedBuffer(b).toString(Charsets.UTF_8);
-  }
-
-  /*
-   * Delete a file or directory and its contents recursively.
-   * Don't follow directories if they are symlinks.
-   * Throws an exception if deletion is unsuccessful.
-   */
-  public static void deleteRecursively(File file) throws IOException {
-    if (file == null) { return; }
-
-    if (file.isDirectory() && !isSymlink(file)) {
-      IOException savedIOException = null;
-      for (File child : listFilesSafely(file)) {
-        try {
-          deleteRecursively(child);
-        } catch (IOException e) {
-          // In case of multiple exceptions, only last one will be thrown
-          savedIOException = e;
-        }
-      }
-      if (savedIOException != null) {
-        throw savedIOException;
-      }
-    }
-
-    boolean deleted = file.delete();
-    // Delete can also fail if the file simply did not exist.
-    if (!deleted && file.exists()) {
-      throw new IOException("Failed to delete: " + file.getAbsolutePath());
-    }
-  }
-
-  private static File[] listFilesSafely(File file) throws IOException {
-    if (file.exists()) {
-      File[] files = file.listFiles();
-      if (files == null) {
-        throw new IOException("Failed to list files for dir: " + file);
-      }
-      return files;
-    } else {
-      return new File[0];
-    }
-  }
-
-  private static boolean isSymlink(File file) throws IOException {
-    Preconditions.checkNotNull(file);
-    File fileInCanonicalDir = null;
-    if (file.getParent() == null) {
-      fileInCanonicalDir = file;
-    } else {
-      fileInCanonicalDir = new File(file.getParentFile().getCanonicalFile(), file.getName());
-    }
-    return !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile());
-  }
-
-  private static final ImmutableMap<String, TimeUnit> timeSuffixes =
-    ImmutableMap.<String, TimeUnit>builder()
-      .put("us", TimeUnit.MICROSECONDS)
-      .put("ms", TimeUnit.MILLISECONDS)
-      .put("s", TimeUnit.SECONDS)
-      .put("m", TimeUnit.MINUTES)
-      .put("min", TimeUnit.MINUTES)
-      .put("h", TimeUnit.HOURS)
-      .put("d", TimeUnit.DAYS)
-      .build();
-
-  private static final ImmutableMap<String, ByteUnit> byteSuffixes =
-    ImmutableMap.<String, ByteUnit>builder()
-      .put("b", ByteUnit.BYTE)
-      .put("k", ByteUnit.KiB)
-      .put("kb", ByteUnit.KiB)
-      .put("m", ByteUnit.MiB)
-      .put("mb", ByteUnit.MiB)
-      .put("g", ByteUnit.GiB)
-      .put("gb", ByteUnit.GiB)
-      .put("t", ByteUnit.TiB)
-      .put("tb", ByteUnit.TiB)
-      .put("p", ByteUnit.PiB)
-      .put("pb", ByteUnit.PiB)
-      .build();
-
-  /**
-   * Convert a passed time string (e.g. 50s, 100ms, or 250us) to a time count for
-   * internal use. If no suffix is provided a direct conversion is attempted.
-   */
-  private static long parseTimeString(String str, TimeUnit unit) {
-    String lower = str.toLowerCase().trim();
-
-    try {
-      Matcher m = Pattern.compile("(-?[0-9]+)([a-z]+)?").matcher(lower);
-      if (!m.matches()) {
-        throw new NumberFormatException("Failed to parse time string: " + str);
-      }
-
-      long val = Long.parseLong(m.group(1));
-      String suffix = m.group(2);
-
-      // Check for invalid suffixes
-      if (suffix != null && !timeSuffixes.containsKey(suffix)) {
-        throw new NumberFormatException("Invalid suffix: \"" + suffix + "\"");
-      }
-
-      // If suffix is valid use that, otherwise none was provided and use the default passed
-      return unit.convert(val, suffix != null ? timeSuffixes.get(suffix) : unit);
-    } catch (NumberFormatException e) {
-      String timeError = "Time must be specified as seconds (s), " +
-              "milliseconds (ms), microseconds (us), minutes (m or min), hour (h), or day (d). " +
-              "E.g. 50s, 100ms, or 250us.";
-
-      throw new NumberFormatException(timeError + "\n" + e.getMessage());
-    }
-  }
-
-  /**
-   * Convert a time parameter such as (50s, 100ms, or 250us) to milliseconds for internal use. If
-   * no suffix is provided, the passed number is assumed to be in ms.
-   */
-  public static long timeStringAsMs(String str) {
-    return parseTimeString(str, TimeUnit.MILLISECONDS);
-  }
-
-  /**
-   * Convert a time parameter such as (50s, 100ms, or 250us) to seconds for internal use. If
-   * no suffix is provided, the passed number is assumed to be in seconds.
-   */
-  public static long timeStringAsSec(String str) {
-    return parseTimeString(str, TimeUnit.SECONDS);
-  }
-
-  /**
-   * Convert a passed byte string (e.g. 50b, 100kb, or 250mb) to a ByteUnit for
-   * internal use. If no suffix is provided a direct conversion of the provided default is
-   * attempted.
-   */
-  private static long parseByteString(String str, ByteUnit unit) {
-    String lower = str.toLowerCase().trim();
-
-    try {
-      Matcher m = Pattern.compile("([0-9]+)([a-z]+)?").matcher(lower);
-      Matcher fractionMatcher = Pattern.compile("([0-9]+\\.[0-9]+)([a-z]+)?").matcher(lower);
-
-      if (m.matches()) {
-        long val = Long.parseLong(m.group(1));
-        String suffix = m.group(2);
-
-        // Check for invalid suffixes
-        if (suffix != null && !byteSuffixes.containsKey(suffix)) {
-          throw new NumberFormatException("Invalid suffix: \"" + suffix + "\"");
-        }
-
-        // If suffix is valid use that, otherwise none was provided and use the default passed
-        return unit.convertFrom(val, suffix != null ? byteSuffixes.get(suffix) : unit);
-      } else if (fractionMatcher.matches()) {
-        throw new NumberFormatException("Fractional values are not supported. Input was: "
-          + fractionMatcher.group(1));
-      } else {
-        throw new NumberFormatException("Failed to parse byte string: " + str);
-      }
-
-    } catch (NumberFormatException e) {
-      String timeError = "Size must be specified as bytes (b), " +
-        "kibibytes (k), mebibytes (m), gibibytes (g), tebibytes (t), or pebibytes(p). " +
-        "E.g. 50b, 100k, or 250m.";
-
-      throw new NumberFormatException(timeError + "\n" + e.getMessage());
-    }
-  }
-
-  /**
-   * Convert a passed byte string (e.g. 50b, 100k, or 250m) to bytes for
-   * internal use.
-   *
-   * If no suffix is provided, the passed number is assumed to be in bytes.
-   */
-  public static long byteStringAsBytes(String str) {
-    return parseByteString(str, ByteUnit.BYTE);
-  }
-
-  /**
-   * Convert a passed byte string (e.g. 50b, 100k, or 250m) to kibibytes for
-   * internal use.
-   *
-   * If no suffix is provided, the passed number is assumed to be in kibibytes.
-   */
-  public static long byteStringAsKb(String str) {
-    return parseByteString(str, ByteUnit.KiB);
-  }
-
-  /**
-   * Convert a passed byte string (e.g. 50b, 100k, or 250m) to mebibytes for
-   * internal use.
-   *
-   * If no suffix is provided, the passed number is assumed to be in mebibytes.
-   */
-  public static long byteStringAsMb(String str) {
-    return parseByteString(str, ByteUnit.MiB);
-  }
-
-  /**
-   * Convert a passed byte string (e.g. 50b, 100k, or 250m) to gibibytes for
-   * internal use.
-   *
-   * If no suffix is provided, the passed number is assumed to be in gibibytes.
-   */
-  public static long byteStringAsGb(String str) {
-    return parseByteString(str, ByteUnit.GiB);
-  }
-
-  /**
-   * Returns a byte array with the buffer's contents, trying to avoid copying the data if
-   * possible.
-   */
-  public static byte[] bufferToArray(ByteBuffer buffer) {
-    if (buffer.hasArray() && buffer.arrayOffset() == 0 &&
-        buffer.array().length == buffer.remaining()) {
-      return buffer.array();
-    } else {
-      byte[] bytes = new byte[buffer.remaining()];
-      buffer.get(bytes);
-      return bytes;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java b/network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java
deleted file mode 100644
index 922c37a..0000000
--- a/network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Based on LimitedInputStream.java from Google Guava
- *
- * Copyright (C) 2007 The Guava Authors
- *
- *    Licensed under the Apache License, Version 2.0 (the "License");
- *    you may not use this file except in compliance with the License.
- *    You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- *    Unless required by applicable law or agreed to in writing, software
- *    distributed under the License is distributed on an "AS IS" BASIS,
- *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *    See the License for the specific language governing permissions and
- *    limitations under the License.
- */
-
-package org.apache.spark.network.util;
-
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Wraps a {@link InputStream}, limiting the number of bytes which can be read.
- *
- * This code is from Guava's 14.0 source code, because there is no compatible way to
- * use this functionality in both a Guava 11 environment and a Guava &gt;14 environment.
- */
-public final class LimitedInputStream extends FilterInputStream {
-  private long left;
-  private long mark = -1;
-
-  public LimitedInputStream(InputStream in, long limit) {
-    super(in);
-    Preconditions.checkNotNull(in);
-    Preconditions.checkArgument(limit >= 0, "limit must be non-negative");
-    left = limit;
-  }
-  @Override public int available() throws IOException {
-    return (int) Math.min(in.available(), left);
-  }
-  // it's okay to mark even if mark isn't supported, as reset won't work
-  @Override public synchronized void mark(int readLimit) {
-    in.mark(readLimit);
-    mark = left;
-  }
-  @Override public int read() throws IOException {
-    if (left == 0) {
-      return -1;
-    }
-    int result = in.read();
-    if (result != -1) {
-      --left;
-    }
-    return result;
-  }
-  @Override public int read(byte[] b, int off, int len) throws IOException {
-    if (left == 0) {
-      return -1;
-    }
-    len = (int) Math.min(len, left);
-    int result = in.read(b, off, len);
-    if (result != -1) {
-      left -= result;
-    }
-    return result;
-  }
-  @Override public synchronized void reset() throws IOException {
-    if (!in.markSupported()) {
-      throw new IOException("Mark not supported");
-    }
-    if (mark == -1) {
-      throw new IOException("Mark not set");
-    }
-    in.reset();
-    left = mark;
-  }
-  @Override public long skip(long n) throws IOException {
-    n = Math.min(n, left);
-    long skipped = in.skip(n);
-    left -= skipped;
-    return skipped;
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java b/network/common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java
deleted file mode 100644
index 668d235..0000000
--- a/network/common/src/main/java/org/apache/spark/network/util/MapConfigProvider.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.util;
-
-import com.google.common.collect.Maps;
-
-import java.util.Map;
-import java.util.NoSuchElementException;
-
-/** ConfigProvider based on a Map (copied in the constructor). */
-public class MapConfigProvider extends ConfigProvider {
-  private final Map<String, String> config;
-
-  public MapConfigProvider(Map<String, String> config) {
-    this.config = Maps.newHashMap(config);
-  }
-
-  @Override
-  public String get(String name) {
-    String value = config.get(name);
-    if (value == null) {
-      throw new NoSuchElementException(name);
-    }
-    return value;
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java b/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
deleted file mode 100644
index caa7260..0000000
--- a/network/common/src/main/java/org/apache/spark/network/util/NettyUtils.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.util;
-
-import java.lang.reflect.Field;
-import java.util.concurrent.ThreadFactory;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.ServerChannel;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.ByteToMessageDecoder;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.util.internal.PlatformDependent;
-
-/**
- * Utilities for creating various Netty constructs based on whether we're using EPOLL or NIO.
- */
-public class NettyUtils {
-  /** Creates a new ThreadFactory which prefixes each thread with the given name. */
-  public static ThreadFactory createThreadFactory(String threadPoolPrefix) {
-    return new ThreadFactoryBuilder()
-      .setDaemon(true)
-      .setNameFormat(threadPoolPrefix + "-%d")
-      .build();
-  }
-
-  /** Creates a Netty EventLoopGroup based on the IOMode. */
-  public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String threadPrefix) {
-    ThreadFactory threadFactory = createThreadFactory(threadPrefix);
-
-    switch (mode) {
-      case NIO:
-        return new NioEventLoopGroup(numThreads, threadFactory);
-      case EPOLL:
-        return new EpollEventLoopGroup(numThreads, threadFactory);
-      default:
-        throw new IllegalArgumentException("Unknown io mode: " + mode);
-    }
-  }
-
-  /** Returns the correct (client) SocketChannel class based on IOMode. */
-  public static Class<? extends Channel> getClientChannelClass(IOMode mode) {
-    switch (mode) {
-      case NIO:
-        return NioSocketChannel.class;
-      case EPOLL:
-        return EpollSocketChannel.class;
-      default:
-        throw new IllegalArgumentException("Unknown io mode: " + mode);
-    }
-  }
-
-  /** Returns the correct ServerSocketChannel class based on IOMode. */
-  public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode) {
-    switch (mode) {
-      case NIO:
-        return NioServerSocketChannel.class;
-      case EPOLL:
-        return EpollServerSocketChannel.class;
-      default:
-        throw new IllegalArgumentException("Unknown io mode: " + mode);
-    }
-  }
-
-  /**
-   * Creates a LengthFieldBasedFrameDecoder where the first 8 bytes are the length of the frame.
-   * This is used before all decoders.
-   */
-  public static TransportFrameDecoder createFrameDecoder() {
-    return new TransportFrameDecoder();
-  }
-
-  /** Returns the remote address on the channel or "&lt;unknown remote&gt;" if none exists. */
-  public static String getRemoteAddress(Channel channel) {
-    if (channel != null && channel.remoteAddress() != null) {
-      return channel.remoteAddress().toString();
-    }
-    return "<unknown remote>";
-  }
-
-  /**
-   * Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
-   * are disabled for TransportClients because the ByteBufs are allocated by the event loop thread,
-   * but released by the executor thread rather than the event loop thread. Those thread-local
-   * caches actually delay the recycling of buffers, leading to larger memory usage.
-   */
-  public static PooledByteBufAllocator createPooledByteBufAllocator(
-      boolean allowDirectBufs,
-      boolean allowCache,
-      int numCores) {
-    if (numCores == 0) {
-      numCores = Runtime.getRuntime().availableProcessors();
-    }
-    return new PooledByteBufAllocator(
-      allowDirectBufs && PlatformDependent.directBufferPreferred(),
-      Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores),
-      Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0),
-      getPrivateStaticField("DEFAULT_PAGE_SIZE"),
-      getPrivateStaticField("DEFAULT_MAX_ORDER"),
-      allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0,
-      allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0,
-      allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0
-    );
-  }
-
-  /** Used to get defaults from Netty's private static fields. */
-  private static int getPrivateStaticField(String name) {
-    try {
-      Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
-      f.setAccessible(true);
-      return f.getInt(null);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e01dcc6/network/common/src/main/java/org/apache/spark/network/util/SystemPropertyConfigProvider.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/util/SystemPropertyConfigProvider.java b/network/common/src/main/java/org/apache/spark/network/util/SystemPropertyConfigProvider.java
deleted file mode 100644
index 5f20b70..0000000
--- a/network/common/src/main/java/org/apache/spark/network/util/SystemPropertyConfigProvider.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.util;
-
-import java.util.NoSuchElementException;
-
-import org.apache.spark.network.util.ConfigProvider;
-
-/** Uses System properties to obtain config values. */
-public class SystemPropertyConfigProvider extends ConfigProvider {
-  @Override
-  public String get(String name) {
-    String value = System.getProperty(name);
-    if (value == null) {
-      throw new NoSuchElementException(name);
-    }
-    return value;
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message