asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mhub...@apache.org
Subject [2/4] asterixdb git commit: Introduce MessagingNetworkManager for NC2NC AppMessaging
Date Fri, 26 Aug 2016 14:38:30 GMT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ICloseableBufferAcceptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ICloseableBufferAcceptor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ICloseableBufferAcceptor.java
new file mode 100644
index 0000000..fba0eaf
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/ICloseableBufferAcceptor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.comm;
+
+/**
+ * A buffer acceptor that can be closed to indicate end of transmission or an error code
+ * specified to indicate an error in transmission.
+ *
+ * @author vinayakb
+ */
+public interface ICloseableBufferAcceptor extends IBufferAcceptor {
+    /**
+     * Close the buffer acceptor.
+     */
+    public void close();
+
+    /**
+     * Indicate that an error occurred.
+     *
+     * @param ecode
+     *            - the error code.
+     */
+    public void error(int ecode);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IConnectionWriterState.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IConnectionWriterState.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IConnectionWriterState.java
new file mode 100644
index 0000000..1f4f204
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IConnectionWriterState.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.comm;
+
+import java.nio.ByteBuffer;
+
+public interface IConnectionWriterState {
+
+    /**
+     * Resets the connection write state based on the passed parameters.
+     *
+     * @param pendingBuffer
+     * @param pendingWriteSize
+     * @param ccb
+     */
+    public void reset(ByteBuffer pendingBuffer, int pendingWriteSize, IChannelControlBlock ccb);
+
+    /**
+     * @return The command of this {@link IConnectionWriterState}
+     */
+    public MuxDemuxCommand getCommand();
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/MuxDemuxCommand.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/MuxDemuxCommand.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/MuxDemuxCommand.java
new file mode 100644
index 0000000..22b77f7
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/MuxDemuxCommand.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.comm;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.NetException;
+
+public class MuxDemuxCommand {
+    public static final int MAX_CHANNEL_ID = Integer.MAX_VALUE - 1;
+
+    public static final int COMMAND_SIZE = 8;
+
+    public static final int MAX_DATA_VALUE = 0x1fffffff;
+
+    public enum CommandType {
+        OPEN_CHANNEL,
+        CLOSE_CHANNEL,
+        CLOSE_CHANNEL_ACK,
+        ERROR,
+        ADD_CREDITS,
+        DATA,
+    }
+
+    private int channelId;
+
+    private CommandType type;
+
+    private int data;
+
+    public int getChannelId() {
+        return channelId;
+    }
+
+    public void setChannelId(int channelId) throws NetException {
+        if (channelId > MAX_CHANNEL_ID) {
+            throw new NetException("channelId " + channelId + " exceeds " + MAX_CHANNEL_ID);
+        }
+        this.channelId = channelId;
+    }
+
+    public CommandType getCommandType() {
+        return type;
+    }
+
+    public void setCommandType(CommandType type) {
+        this.type = type;
+    }
+
+    public int getData() {
+        return data;
+    }
+
+    public void setData(int data) throws NetException {
+        if (data > MAX_DATA_VALUE) {
+            throw new NetException("data " + data + " exceeds " + MAX_DATA_VALUE);
+        }
+        this.data = data;
+    }
+
+    public void write(ByteBuffer buffer) {
+        long cmd = (((long) channelId) << 32) | (((long) type.ordinal()) << 29) | (data & 0x1fffffff);
+        buffer.putLong(cmd);
+    }
+
+    public void read(ByteBuffer buffer) {
+        long cmd = buffer.getLong();
+        channelId = (int) ((cmd >> 32) & 0x7fffffff);
+        type = CommandType.values()[(int) ((cmd >> 29) & 0x7)];
+        data = (int) (cmd & 0x1fffffff);
+    }
+
+    @Override
+    public String toString() {
+        return channelId + ":" + type + ":" + data;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/NetException.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/NetException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/NetException.java
new file mode 100644
index 0000000..eaf67af
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/NetException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.exceptions;
+
+public class NetException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public NetException() {
+        // empty constructor
+    }
+
+    public NetException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public NetException(String message) {
+        super(message);
+    }
+
+    public NetException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
index 6f42410..c238ae3 100644
--- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
@@ -22,9 +22,10 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 
+import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
-import org.apache.hyracks.net.exceptions.NetException;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
 import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection;
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemux;
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
@@ -38,7 +39,7 @@ public class ClientNetworkManager implements IChannelConnectionFactory {
         /* This is a connect only socket and does not listen to any incoming connections, so pass null to
          * localAddress and listener.
          */
-        md = new MuxDemux(null, null, nThreads, MAX_CONNECTION_ATTEMPTS);
+        md = new MuxDemux(null, null, nThreads, MAX_CONNECTION_ATTEMPTS, FullFrameChannelInterfaceFactory.INSTANCE);
     }
 
     public void start() throws IOException {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
index fba7cf5..e3c6f4a 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/DatasetNetworkInputChannel.java
@@ -27,13 +27,13 @@ import java.util.logging.Logger;
 
 import org.apache.hyracks.api.channels.IInputChannel;
 import org.apache.hyracks.api.channels.IInputChannelMonitor;
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
 import org.apache.hyracks.api.context.IHyracksCommonContext;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
-import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 
 public class DatasetNetworkInputChannel implements IInputChannel {
     private static final Logger LOGGER = Logger.getLogger(DatasetNetworkInputChannel.class.getName());
@@ -54,7 +54,7 @@ public class DatasetNetworkInputChannel implements IInputChannel {
 
     private final int nBuffers;
 
-    private ChannelControlBlock ccb;
+    private IChannelControlBlock ccb;
 
     private IInputChannelMonitor monitor;
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/IChannelConnectionFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/IChannelConnectionFactory.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/IChannelConnectionFactory.java
index 2e66f54..fbcb0da 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/IChannelConnectionFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/IChannelConnectionFactory.java
@@ -20,9 +20,9 @@ package org.apache.hyracks.comm.channels;
 
 import java.net.SocketAddress;
 
-import org.apache.hyracks.net.exceptions.NetException;
-import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.exceptions.NetException;
 
 public interface IChannelConnectionFactory {
-    public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException;
+    public IChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
index 235536f..a846da3 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
@@ -27,12 +27,12 @@ import java.util.logging.Logger;
 
 import org.apache.hyracks.api.channels.IInputChannel;
 import org.apache.hyracks.api.channels.IInputChannelMonitor;
+import org.apache.hyracks.api.comm.IBufferAcceptor;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
 import org.apache.hyracks.api.context.IHyracksCommonContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.partitions.PartitionId;
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
-import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 
 public class NetworkInputChannel implements IInputChannel {
     private static final Logger LOGGER = Logger.getLogger(NetworkInputChannel.class.getName());
@@ -49,7 +49,7 @@ public class NetworkInputChannel implements IInputChannel {
 
     private final int nBuffers;
 
-    private ChannelControlBlock ccb;
+    private IChannelControlBlock ccb;
 
     private IInputChannelMonitor monitor;
 
@@ -99,7 +99,8 @@ public class NetworkInputChannel implements IInputChannel {
         }
         ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
         ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
-        ccb.getReadInterface().setBufferFactory(new ReadBufferFactory(nBuffers, ctx), nBuffers, ctx.getInitialFrameSize());
+        ccb.getReadInterface().setBufferFactory(new ReadBufferFactory(nBuffers, ctx), nBuffers,
+                ctx.getInitialFrameSize());
         ByteBuffer writeBuffer = ByteBuffer.allocate(INITIAL_MESSAGE_SIZE);
         writeBuffer.putLong(partitionId.getJobId().getId());
         writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
index 46220de..60e2e35 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkOutputChannel.java
@@ -22,9 +22,9 @@ import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.Deque;
 
+import org.apache.hyracks.api.comm.IBufferAcceptor;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.net.buffers.IBufferAcceptor;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 
 public class NetworkOutputChannel implements IFrameWriter {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
index 3aa77b9..876b3de 100644
--- a/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
@@ -20,12 +20,9 @@ package org.apache.hyracks.comm.channels;
 
 import java.nio.ByteBuffer;
 
+import org.apache.hyracks.api.comm.IBufferFactory;
 import org.apache.hyracks.api.context.IHyracksCommonContext;
-import org.apache.hyracks.net.protocols.muxdemux.IBufferFactory;
 
-/**
- * @author yingyib
- */
 public class ReadBufferFactory implements IBufferFactory {
 
     private final int limit;
@@ -39,17 +36,12 @@ public class ReadBufferFactory implements IBufferFactory {
 
     @Override
     public ByteBuffer createBuffer() {
-        try {
-            if (counter >= limit) {
-                return null;
-            } else {
-                ByteBuffer frame = ByteBuffer.allocate(frameSize);
-                counter++;
-                return frame;
-            }
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
+        if (counter >= limit) {
+            return null;
+        } else {
+            ByteBuffer frame = ByteBuffer.allocate(frameSize);
+            counter++;
+            return frame;
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
index d93ba0d..ff5832a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
@@ -23,10 +23,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.common.base.INodeController;
@@ -35,6 +31,9 @@ import org.apache.hyracks.control.common.controllers.NodeRegistration;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatData;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema;
 import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema.GarbageCollectorInfo;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
 
 public class NodeControllerState {
     private static final int RRD_SIZE = 720;
@@ -47,6 +46,8 @@ public class NodeControllerState {
 
     private final NetworkAddress datasetPort;
 
+    private final NetworkAddress messagingPort;
+
     private final Set<JobId> activeJobIds;
 
     private final String osName;
@@ -142,6 +143,7 @@ public class NodeControllerState {
         ncConfig = reg.getNCConfig();
         dataPort = reg.getDataPort();
         datasetPort = reg.getDatasetPort();
+        messagingPort = reg.getMessagingPort();
         activeJobIds = new HashSet<JobId>();
 
         osName = reg.getOSName();
@@ -264,6 +266,10 @@ public class NodeControllerState {
         return datasetPort;
     }
 
+    public NetworkAddress getMessagingPort() {
+        return messagingPort;
+    }
+
     public JSONObject toSummaryJSON() throws JSONException {
         JSONObject o = new JSONObject();
         o.put("node-id", ncConfig.nodeId);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index 785a202..726bf12 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -40,11 +40,11 @@ public class GetNodeControllersInfoWork extends AbstractWork {
 
     @Override
     public void run() {
-        Map<String, NodeControllerInfo> result = new LinkedHashMap<String, NodeControllerInfo>();
+        Map<String, NodeControllerInfo> result = new LinkedHashMap<>();
         Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
         for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
-            result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE, e.getValue().getDataPort(), e
-                    .getValue().getDatasetPort()));
+            result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE, e.getValue().getDataPort(),
+                    e.getValue().getDatasetPort(), e.getValue().getMessagingPort()));
         }
         callback.setValue(result);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
index d08df60..64b9737 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java
@@ -18,6 +18,12 @@
  */
 package org.apache.hyracks.control.common.controllers;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hyracks.api.application.IApplicationConfig;
 import org.apache.hyracks.control.common.application.IniApplicationConfig;
 import org.ini4j.Ini;
@@ -25,12 +31,6 @@ import org.kohsuke.args4j.Argument;
 import org.kohsuke.args4j.Option;
 import org.kohsuke.args4j.spi.StopOptionHandler;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.util.List;
-import java.util.Map;
-
 public class NCConfig implements Serializable {
     private static final long serialVersionUID = 2L;
 
@@ -82,7 +82,7 @@ public class NCConfig implements Serializable {
     @Option(name = "-result-public-port", usage = "Public IP port to announce dataset result distribution listener (default: same as -result-port; must set -result-public-ip-address also)", required = false)
     public int resultPublicPort = 0;
 
-    @Option(name = "-retries", usage ="Number of attempts to contact CC before giving up (default = 5)")
+    @Option(name = "-retries", usage = "Number of attempts to contact CC before giving up (default = 5)")
     public int retries = 5;
 
     @Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
@@ -112,6 +112,23 @@ public class NCConfig implements Serializable {
     @Option(name = "-config-file", usage = "Specify path to local configuration file (default: no local config)", required = false)
     public String configFile = null;
 
+    //TODO add messaging values to NC start scripts
+    @Option(name = "-messaging-ip-address", usage = "IP Address to bind messaging "
+            + "listener (default: same as -address)", required = false)
+    public String messagingIPAddress;
+
+    @Option(name = "-messaging-port", usage = "IP port to bind messaging listener "
+            + "(default: random port)", required = false)
+    public int messagingPort = 0;
+
+    @Option(name = "-messaging-public-ip-address", usage = "Public IP Address to announce messaging"
+            + " listener (default: same as -messaging-ip-address)", required = false)
+    public String messagingPublicIPAddress;
+
+    @Option(name = "-messaging-public-port", usage = "Public IP port to announce messaging listener"
+            + " (default: same as -messaging-port; must set -messaging-public-port also)", required = false)
+    public int messagingPublicPort = 0;
+
     @Argument
     @Option(name = "--", handler = StopOptionHandler.class)
     public List<String> appArgs;
@@ -139,13 +156,14 @@ public class NCConfig implements Serializable {
         resultIPAddress = IniUtils.getString(ini, nodeSection, "result.address", resultIPAddress);
         resultPort = IniUtils.getInt(ini, nodeSection, "result.port", resultPort);
 
-        clusterNetPublicIPAddress = IniUtils.getString(
-                ini, nodeSection, "public.cluster.address", clusterNetPublicIPAddress);
+        clusterNetPublicIPAddress = IniUtils.getString(ini, nodeSection, "public.cluster.address",
+                clusterNetPublicIPAddress);
         clusterNetPublicPort = IniUtils.getInt(ini, nodeSection, "public.cluster.port", clusterNetPublicPort);
         dataPublicIPAddress = IniUtils.getString(ini, nodeSection, "public.data.address", dataPublicIPAddress);
         dataPublicPort = IniUtils.getInt(ini, nodeSection, "public.data.port", dataPublicPort);
         resultPublicIPAddress = IniUtils.getString(ini, nodeSection, "public.result.address", resultPublicIPAddress);
         resultPublicPort = IniUtils.getInt(ini, nodeSection, "public.result.port", resultPublicPort);
+        //TODO pass messaging info from ini file
 
         retries = IniUtils.getInt(ini, nodeSection, "retries", retries);
 
@@ -169,23 +187,41 @@ public class NCConfig implements Serializable {
         }
 
         // "address" is the default for all IP addresses
-        if (clusterNetIPAddress == null) clusterNetIPAddress = ipAddress;
-        if (dataIPAddress == null) dataIPAddress = ipAddress;
-        if (resultIPAddress == null) resultIPAddress = ipAddress;
+        if (clusterNetIPAddress == null) {
+            clusterNetIPAddress = ipAddress;
+        }
+        if (dataIPAddress == null) {
+            dataIPAddress = ipAddress;
+        }
+        if (resultIPAddress == null) {
+            resultIPAddress = ipAddress;
+        }
 
         // All "public" options default to their "non-public" versions
-        if (clusterNetPublicIPAddress == null) clusterNetPublicIPAddress = clusterNetIPAddress;
-        if (clusterNetPublicPort == 0) clusterNetPublicPort = clusterNetPort;
-        if (dataPublicIPAddress == null) dataPublicIPAddress = dataIPAddress;
-        if (dataPublicPort == 0) dataPublicPort = dataPort;
-        if (resultPublicIPAddress == null) resultPublicIPAddress = resultIPAddress;
-        if (resultPublicPort == 0) resultPublicPort = resultPort;
+        if (clusterNetPublicIPAddress == null) {
+            clusterNetPublicIPAddress = clusterNetIPAddress;
+        }
+        if (clusterNetPublicPort == 0) {
+            clusterNetPublicPort = clusterNetPort;
+        }
+        if (dataPublicIPAddress == null) {
+            dataPublicIPAddress = dataIPAddress;
+        }
+        if (dataPublicPort == 0) {
+            dataPublicPort = dataPort;
+        }
+        if (resultPublicIPAddress == null) {
+            resultPublicIPAddress = resultIPAddress;
+        }
+        if (resultPublicPort == 0) {
+            resultPublicPort = resultPort;
+        }
     }
 
     /**
      * @return An IApplicationConfig representing this NCConfig.
-     * Note: Currently this only includes the values from the configuration
-     * file, not anything specified on the command-line. QQQ
+     *         Note: Currently this only includes the values from the configuration
+     *         file, not anything specified on the command-line. QQQ
      */
     public IApplicationConfig getAppConfig() {
         return new IniApplicationConfig(ini);
@@ -215,10 +251,12 @@ public class NCConfig implements Serializable {
         configuration.put("result-time-to-live", String.valueOf(resultTTL));
         configuration.put("result-sweep-threshold", String.valueOf(resultSweepThreshold));
         configuration.put("result-manager-memory", String.valueOf(resultManagerMemory));
-
+        configuration.put("messaging-ip-address", messagingIPAddress);
+        configuration.put("messaging-port", String.valueOf(messagingPort));
+        configuration.put("messaging-public-ip-address", messagingPublicIPAddress);
+        configuration.put("messaging-public-port", String.valueOf(messagingPublicPort));
         if (appNCMainClass != null) {
             configuration.put("app-nc-main-class", appNCMainClass);
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
index 5a23455..bb8022e 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java
@@ -65,10 +65,13 @@ public final class NodeRegistration implements Serializable {
 
     private final HeartbeatSchema hbSchema;
 
+    private final NetworkAddress messagingPort;
+
     public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
             NetworkAddress datasetPort, String osName, String arch, String osVersion, int nProcessors, String vmName,
             String vmVersion, String vmVendor, String classpath, String libraryPath, String bootClasspath,
-            List<String> inputArguments, Map<String, String> systemProperties, HeartbeatSchema hbSchema) {
+            List<String> inputArguments, Map<String, String> systemProperties, HeartbeatSchema hbSchema,
+            NetworkAddress messagingPort) {
         this.ncAddress = ncAddress;
         this.nodeId = nodeId;
         this.ncConfig = ncConfig;
@@ -87,6 +90,7 @@ public final class NodeRegistration implements Serializable {
         this.inputArguments = inputArguments;
         this.systemProperties = systemProperties;
         this.hbSchema = hbSchema;
+        this.messagingPort = messagingPort;
     }
 
     public InetSocketAddress getNodeControllerAddress() {
@@ -160,4 +164,8 @@ public final class NodeRegistration implements Serializable {
     public Map<String, String> getSystemProperties() {
         return systemProperties;
     }
+
+    public NetworkAddress getMessagingPort() {
+        return messagingPort;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
index b015e3d..2d0a49d 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCDriver.java
@@ -18,12 +18,12 @@
  */
 package org.apache.hyracks.control.nc;
 
-import org.apache.hyracks.control.common.controllers.NCConfig;
-import org.kohsuke.args4j.CmdLineParser;
-
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.kohsuke.args4j.CmdLineParser;
+
 public class NCDriver {
     private static final Logger LOGGER = Logger.getLogger(NCDriver.class.getName());
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 43cac74..ac994a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -73,6 +73,7 @@ import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.control.nc.io.profiling.IIOCounter;
 import org.apache.hyracks.control.nc.io.profiling.IOCounterFactory;
 import org.apache.hyracks.control.nc.net.DatasetNetworkManager;
+import org.apache.hyracks.control.nc.net.MessagingNetworkManager;
 import org.apache.hyracks.control.nc.net.NetworkManager;
 import org.apache.hyracks.control.nc.partitions.PartitionManager;
 import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
@@ -91,6 +92,7 @@ import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.api.IIPCI;
 import org.apache.hyracks.ipc.api.IPCPerformanceCounters;
 import org.apache.hyracks.ipc.impl.IPCSystem;
+import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
 import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
 
 public class NodeControllerService implements IControllerService {
@@ -158,6 +160,8 @@ public class NodeControllerService implements IControllerService {
 
     private IIOCounter ioCounter;
 
+    private MessagingNetworkManager messagingNetManager;
+
     public NodeControllerService(NCConfig ncConfig) throws Exception {
         this.ncConfig = ncConfig;
         id = ncConfig.nodeId;
@@ -171,7 +175,8 @@ public class NodeControllerService implements IControllerService {
         }
         partitionManager = new PartitionManager(this);
         netManager = new NetworkManager(ncConfig.dataIPAddress, ncConfig.dataPort, partitionManager,
-                ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort);
+                ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.dataPublicIPAddress, ncConfig.dataPublicPort,
+                FullFrameChannelInterfaceFactory.INSTANCE);
 
         lccm = new LifeCycleComponentManager();
         queue = new WorkQueue(Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
@@ -247,7 +252,12 @@ public class NodeControllerService implements IControllerService {
                 ncConfig.resultTTL, ncConfig.resultSweepThreshold);
         datasetNetworkManager = new DatasetNetworkManager(ncConfig.resultIPAddress, ncConfig.resultPort,
                 datasetPartitionManager, ncConfig.nNetThreads, ncConfig.nNetBuffers, ncConfig.resultPublicIPAddress,
-                ncConfig.resultPublicPort);
+                ncConfig.resultPublicPort, FullFrameChannelInterfaceFactory.INSTANCE);
+        if (ncConfig.messagingIPAddress != null && appCtx.getMessagingChannelInterfaceFactory() != null) {
+            messagingNetManager = new MessagingNetworkManager(this, ncConfig.messagingIPAddress, ncConfig.messagingPort,
+                    ncConfig.nNetThreads, ncConfig.messagingPublicIPAddress, ncConfig.messagingPublicPort,
+                    appCtx.getMessagingChannelInterfaceFactory());
+        }
     }
 
     @Override
@@ -260,8 +270,11 @@ public class NodeControllerService implements IControllerService {
         init();
 
         datasetNetworkManager.start();
-        IIPCHandle ccIPCHandle =
-                ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), ncConfig.retries);
+        if (messagingNetManager != null) {
+            messagingNetManager.start();
+        }
+        IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort),
+                ncConfig.retries);
         this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
         for (int i = 0; i < gcInfos.length; ++i) {
@@ -271,11 +284,13 @@ public class NodeControllerService implements IControllerService {
         // Use "public" versions of network addresses and ports
         NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
         NetworkAddress netAddress = netManager.getPublicNetworkAddress();
+        NetworkAddress meesagingPort = messagingNetManager != null ? messagingNetManager.getPublicNetworkAddress()
+                : null;
         ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
                 osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
                 runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
                 runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
-                runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
+                runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema, meesagingPort));
 
         synchronized (this) {
             while (registrationPending) {
@@ -338,6 +353,9 @@ public class NodeControllerService implements IControllerService {
             datasetPartitionManager.close();
             netManager.stop();
             datasetNetworkManager.stop();
+            if (messagingNetManager != null) {
+                messagingNetManager.stop();
+            }
             queue.stop();
             if (ncAppEntryPoint != null) {
                 ncAppEntryPoint.stop();
@@ -490,8 +508,7 @@ public class NodeControllerService implements IControllerService {
             CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
             switch (fn.getFunctionId()) {
                 case SEND_APPLICATION_MESSAGE: {
-                    CCNCFunctions.SendApplicationMessageFunction amf =
-                            (CCNCFunctions.SendApplicationMessageFunction) fn;
+                    CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
                     queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
                             amf.getDeploymentId(), amf.getNodeId()));
                     return;
@@ -516,8 +533,7 @@ public class NodeControllerService implements IControllerService {
                 }
 
                 case REPORT_PARTITION_AVAILABILITY: {
-                    CCNCFunctions.ReportPartitionAvailabilityFunction rpaf =
-                            (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
+                    CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
                     queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this,
                             rpaf.getPartitionId(), rpaf.getNetworkAddress()));
                     return;
@@ -530,8 +546,7 @@ public class NodeControllerService implements IControllerService {
                 }
 
                 case GET_NODE_CONTROLLERS_INFO_RESPONSE: {
-                    CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf =
-                            (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
+                    CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf = (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
                     setNodeControllersInfo(gncirf.getNodeControllerInfos());
                     return;
                 }
@@ -572,6 +587,10 @@ public class NodeControllerService implements IControllerService {
         return datasetPartitionManager;
     }
 
+    public MessagingNetworkManager getMessagingNetworkManager() {
+        return messagingNetManager;
+    }
+
     /**
      * Shutdown hook that invokes {@link NCApplicationEntryPoint#stop() stop} method.
      */

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
index d23c701..a2c3f4c 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCApplicationContext.java
@@ -25,6 +25,7 @@ import java.io.Serializable;
 import org.apache.hyracks.api.application.IApplicationConfig;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.api.application.IStateDumpHandler;
+import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
 import org.apache.hyracks.api.context.IHyracksRootContext;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.resources.memory.IMemoryManager;
@@ -42,10 +43,11 @@ public class NCApplicationContext extends ApplicationContext implements INCAppli
     private Object appObject;
     private IStateDumpHandler sdh;
     private final NodeControllerService ncs;
+    private IChannelInterfaceFactory messagingChannelInterfaceFactory;
 
-    public NCApplicationContext(NodeControllerService ncs, ServerContext serverCtx, IHyracksRootContext rootCtx, String nodeId,
-                                MemoryManager memoryManager, ILifeCycleComponentManager lifeCyclecomponentManager,
-                                IApplicationConfig appConfig) throws IOException {
+    public NCApplicationContext(NodeControllerService ncs, ServerContext serverCtx, IHyracksRootContext rootCtx,
+            String nodeId, MemoryManager memoryManager, ILifeCycleComponentManager lifeCyclecomponentManager,
+            IApplicationConfig appConfig) throws IOException {
         super(serverCtx, appConfig);
         this.lccm = lifeCyclecomponentManager;
         this.nodeId = nodeId;
@@ -108,4 +110,14 @@ public class NCApplicationContext extends ApplicationContext implements INCAppli
     public IControllerService getControllerService() {
         return ncs;
     }
+
+    @Override
+    public IChannelInterfaceFactory getMessagingChannelInterfaceFactory() {
+        return messagingChannelInterfaceFactory;
+    }
+
+    @Override
+    public void setMessagingChannelInterfaceFactory(IChannelInterfaceFactory interfaceFactory) {
+        this.messagingChannelInterfaceFactory = interfaceFactory;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
index 3c99ef3..0b74806 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/DatasetNetworkManager.java
@@ -25,15 +25,16 @@ import java.nio.ByteBuffer;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
 import org.apache.hyracks.comm.channels.NetworkOutputChannel;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
-import org.apache.hyracks.net.exceptions.NetException;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
 import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection;
@@ -58,19 +59,24 @@ public class DatasetNetworkManager implements IChannelConnectionFactory {
     private NetworkAddress publicNetworkAddress;
 
     /**
-     * @param inetAddress - Internet address to bind the listen port to
-     * @param inetPort - Port to bind on inetAddress
-     * @param publicInetAddress - Internet address to report to consumers;
-     *    useful when behind NAT. null = same as inetAddress
-     * @param publicInetPort - Port to report to consumers; useful when
-     *    behind NAT. Ignored if publicInetAddress is null. 0 = same as inetPort
+     * @param inetAddress
+     *            - Internet address to bind the listen port to
+     * @param inetPort
+     *            - Port to bind on inetAddress
+     * @param publicInetAddress
+     *            - Internet address to report to consumers;
+     *            useful when behind NAT. null = same as inetAddress
+     * @param publicInetPort
+     *            - Port to report to consumers; useful when
+     *            behind NAT. Ignored if publicInetAddress is null. 0 = same as inetPort
      */
-    public DatasetNetworkManager(String inetAddress, int inetPort, IDatasetPartitionManager partitionManager, int nThreads,
-                                 int nBuffers, String publicInetAddress, int publicInetPort) throws IOException {
+    public DatasetNetworkManager(String inetAddress, int inetPort, IDatasetPartitionManager partitionManager,
+            int nThreads, int nBuffers, String publicInetAddress, int publicInetPort,
+            IChannelInterfaceFactory channelInterfaceFactory) {
         this.partitionManager = partitionManager;
         this.nBuffers = nBuffers;
         md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new ChannelOpenListener(), nThreads,
-                MAX_CONNECTION_ATTEMPTS);
+                MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory);
         // Just save these values for the moment; may be reset in start()
         publicNetworkAddress = new NetworkAddress(publicInetAddress, publicInetPort);
     }
@@ -84,12 +90,10 @@ public class DatasetNetworkManager implements IChannelConnectionFactory {
         // make it a copy of localNetworkAddress
         if (publicNetworkAddress.getAddress() == null) {
             publicNetworkAddress = localNetworkAddress;
-        }
-        else {
+        } else {
             // Likewise for public port
             if (publicNetworkAddress.getPort() == 0) {
-                publicNetworkAddress = new NetworkAddress
-                    (publicNetworkAddress.getAddress(), sockAddr.getPort());
+                publicNetworkAddress = new NetworkAddress(publicNetworkAddress.getAddress(), sockAddr.getPort());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
new file mode 100644
index 0000000..7983b93
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.net;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.comm.IChannelControlBlock;
+import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
+import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection;
+import org.apache.hyracks.net.protocols.muxdemux.MuxDemux;
+import org.apache.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
+
+public class MessagingNetworkManager {
+
+    private static final Logger LOGGER = Logger.getLogger(MessagingNetworkManager.class.getName());
+    private static final int MAX_CONNECTION_ATTEMPTS = 5;
+    private final MuxDemux md;
+    private NetworkAddress localNetworkAddress;
+    private NetworkAddress publicNetworkAddress;
+    private final Map<String, IChannelControlBlock> ncChannels = new HashMap<>();
+    private final NodeControllerService ncs;
+    private final Map<IChannelControlBlock, ICloseableBufferAcceptor> channelFullBufferAcceptor = new HashMap<>();
+
+    public MessagingNetworkManager(NodeControllerService ncs, String inetAddress, int inetPort, int nThreads,
+            String publicInetAddress, int publicInetPort, IChannelInterfaceFactory channelInterfaceFactory) {
+        this.ncs = ncs;
+        md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new ChannelOpenListener(), nThreads,
+                MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory);
+        publicNetworkAddress = new NetworkAddress(publicInetAddress, publicInetPort);
+    }
+
+    public void start() throws IOException {
+        md.start();
+        InetSocketAddress sockAddr = md.getLocalAddress();
+        localNetworkAddress = new NetworkAddress(sockAddr.getHostString(), sockAddr.getPort());
+
+        // See if the public address was explicitly specified, and if not,
+        // make it a copy of localNetworkAddress
+        if (publicNetworkAddress.getAddress() == null) {
+            publicNetworkAddress = localNetworkAddress;
+        } else {
+            // Likewise for public port
+            if (publicNetworkAddress.getPort() == 0) {
+                publicNetworkAddress = new NetworkAddress(publicNetworkAddress.getAddress(), sockAddr.getPort());
+            }
+        }
+    }
+
+    public void stop() {
+        // Currently there is nothing to stop
+    }
+
+    public IChannelControlBlock getMessagingChannel(String nodeId) throws Exception {
+        synchronized (ncChannels) {
+            IChannelControlBlock ccb = ncChannels.get(nodeId);
+            if (ccb == null) {
+                // Establish new connection
+                ccb = establishNewConnection(nodeId);
+                addOpenChannel(nodeId, ccb);
+            }
+            return ccb;
+        }
+    }
+
+    private ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException {
+        MultiplexedConnection mConn = md.connect((InetSocketAddress) remoteAddress);
+        return mConn.openChannel();
+    }
+
+    public MuxDemuxPerformanceCounters getPerformanceCounters() {
+        return md.getPerformanceCounters();
+    }
+
+    public NetworkAddress getPublicNetworkAddress() {
+        return publicNetworkAddress;
+    }
+
+    private void prepareMessagingInitialMessage(String ncId, final ByteBuffer buffer) throws NetException {
+        /*
+         * The messaging initial message contains the node id of the node
+         * which requested the channel to be opened.
+         */
+        int intialMsgLength = Integer.BYTES + ncId.length();
+        if (intialMsgLength > buffer.capacity()) {
+            throw new NetException("Initial message exceded the channel buffer size " + buffer.capacity() + " bytes");
+        }
+        buffer.clear();
+        buffer.putInt(ncId.length());
+        buffer.put(ncId.getBytes());
+        buffer.flip();
+    }
+
+    private IChannelControlBlock establishNewConnection(String nodeId) throws Exception {
+        Map<String, NodeControllerInfo> nodeControllers = ncs.getNodeControllersInfo();
+
+        // Get the node messaging address from its info
+        NodeControllerInfo nodeControllerInfo = nodeControllers.get(nodeId);
+        if (nodeControllerInfo == null) {
+            throw new NetException("Could not find node: " + nodeId);
+        }
+        NetworkAddress nodeMessagingNeAddress = nodeControllerInfo.getMessagingNetworkAddress();
+        SocketAddress nodeAddress = new InetSocketAddress(InetAddress.getByName(nodeMessagingNeAddress.getAddress()),
+                nodeMessagingNeAddress.getPort());
+
+        // Open the channel
+        IChannelControlBlock ccb = connect(nodeAddress);
+        try {
+            // Prepare the initial message buffer
+            ByteBuffer initialBuffer = ccb.getReadInterface().getBufferFactory().createBuffer();
+            prepareMessagingInitialMessage(ncs.getId(), initialBuffer);
+            // Send the initial messaging channel handshake message to register the opened channel on both nodes
+            ccb.getWriteInterface().getFullBufferAcceptor().accept(initialBuffer);
+            return ccb;
+        } catch (NetException e) {
+            closeChannel(ccb);
+            throw e;
+        }
+    }
+
+    private void addOpenChannel(final String nodeId, final IChannelControlBlock ccb) {
+        synchronized (ncChannels) {
+            if (ncChannels.get(nodeId) == null) {
+                ncChannels.put(nodeId, ccb);
+            } else {
+                closeChannel(ccb);
+                /*
+                 * TODO Currently there is a chance that two nodes will open
+                 * a channel to each other at the exact same time and both will
+                 * end up using a half closed channel. While this isn't a big issue,
+                 * it should be eliminated by introducing negotiation protocol
+                 * between nodes to decide which channel to use and which channel
+                 * to close fully.
+                 */
+            }
+        }
+    }
+
+    private void closeChannel(IChannelControlBlock ccb) {
+        ccb.getWriteInterface().getFullBufferAcceptor().close();
+    }
+
+    private class ChannelOpenListener implements IChannelOpenListener {
+        @Override
+        public void channelOpened(ChannelControlBlock channel) {
+            // Store the channel's original acceptor (which is set by the application)
+            ICloseableBufferAcceptor fullBufferAcceptor = channel.getReadInterface().getFullBufferAcceptor();
+            synchronized (channelFullBufferAcceptor) {
+                channelFullBufferAcceptor.put(channel, fullBufferAcceptor);
+            }
+            // Temporary set the acceptor to InitialBufferAcceptor to read the initial message
+            channel.getReadInterface().setFullBufferAcceptor(new InitialBufferAcceptor(channel));
+        }
+    }
+
+    private class InitialBufferAcceptor implements ICloseableBufferAcceptor {
+        private final ChannelControlBlock ccb;
+
+        public InitialBufferAcceptor(ChannelControlBlock ccb) {
+            this.ccb = ccb;
+        }
+
+        @Override
+        public void accept(ByteBuffer buffer) {
+            String nodeId = readMessagingInitialMessage(buffer);
+            if (LOGGER.isLoggable(Level.FINE)) {
+                LOGGER.fine("Opened messaging channel with node: " + nodeId);
+            }
+            // Return the channel's original acceptor
+            ICloseableBufferAcceptor originalAcceptor;
+            synchronized (channelFullBufferAcceptor) {
+                originalAcceptor = channelFullBufferAcceptor.remove(ccb);
+                if (originalAcceptor == null) {
+                    throw new IllegalStateException("Could not find channel acceptor");
+                }
+            }
+            ccb.getReadInterface().setFullBufferAcceptor(originalAcceptor);
+            addOpenChannel(nodeId, ccb);
+        }
+
+        @Override
+        public void close() {
+            // Nothing to close
+        }
+
+        @Override
+        public void error(int ecode) {
+            // Errors should be handled in the application
+        }
+
+        private String readMessagingInitialMessage(ByteBuffer buffer) {
+            int nodeIdLength = buffer.getInt();
+            byte[] stringBytes = new byte[nodeIdLength];
+            buffer.get(stringBytes);
+            return new String(stringBytes);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
index 4b28aef..325966a 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
@@ -25,16 +25,17 @@ import java.nio.ByteBuffer;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
+import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
 import org.apache.hyracks.comm.channels.NetworkOutputChannel;
 import org.apache.hyracks.control.nc.partitions.PartitionManager;
-import org.apache.hyracks.net.buffers.ICloseableBufferAcceptor;
-import org.apache.hyracks.net.exceptions.NetException;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
 import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection;
@@ -58,13 +59,13 @@ public class NetworkManager implements IChannelConnectionFactory {
 
     private NetworkAddress publicNetworkAddress;
 
-    public NetworkManager(String inetAddress, int inetPort, PartitionManager partitionManager, int nThreads, int nBuffers,
-                          String publicInetAddress, int publicInetPort)
-            throws IOException {
+    public NetworkManager(String inetAddress, int inetPort, PartitionManager partitionManager, int nThreads,
+            int nBuffers, String publicInetAddress, int publicInetPort,
+            IChannelInterfaceFactory channelInterfaceFactory) {
         this.partitionManager = partitionManager;
         this.nBuffers = nBuffers;
         md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new ChannelOpenListener(), nThreads,
-                MAX_CONNECTION_ATTEMPTS);
+                MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory);
         // Just save these values for the moment; may be reset in start()
         publicNetworkAddress = new NetworkAddress(publicInetAddress, publicInetPort);
     }
@@ -78,12 +79,10 @@ public class NetworkManager implements IChannelConnectionFactory {
         // make it a copy of localNetworkAddress
         if (publicNetworkAddress.getAddress() == null) {
             publicNetworkAddress = localNetworkAddress;
-        }
-        else {
+        } else {
             // Likewise for public port
             if (publicNetworkAddress.getPort() == 0) {
-                publicNetworkAddress = new NetworkAddress
-                    (publicNetworkAddress.getAddress(), sockAddr.getPort());
+                publicNetworkAddress = new NetworkAddress(publicNetworkAddress.getAddress(), sockAddr.getPort());
             }
         }
     }
@@ -100,6 +99,7 @@ public class NetworkManager implements IChannelConnectionFactory {
 
     }
 
+    @Override
     public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException {
         MultiplexedConnection mConn = md.connect((InetSocketAddress) remoteAddress);
         return mConn.openChannel();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
index 373fe21..13cb171 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/scheduler/SchedulerTest.java
@@ -21,24 +21,22 @@ package org.apache.hyracks.hdfs.scheduler;
 
 import java.io.FileReader;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.util.HashMap;
 import java.util.Map;
 
-import junit.framework.Assert;
-import junit.framework.TestCase;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
-import org.xml.sax.InputSource;
-import org.xml.sax.SAXException;
-
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.topology.ClusterTopology;
 import org.apache.hyracks.api.topology.TopologyDefinitionParser;
+import org.apache.hyracks.hdfs.utils.TestUtils;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
 
 @SuppressWarnings("deprecation")
 public class SchedulerTest extends TestCase {
@@ -60,13 +58,8 @@ public class SchedulerTest extends TestCase {
      * @throws Exception
      */
     public void testSchedulerSimple() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         InputSplit[] fileSplits = new InputSplit[6];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -98,13 +91,17 @@ public class SchedulerTest extends TestCase {
      * @throws Exception
      */
     public void testSchedulerLargerHDFS() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc7", new NodeControllerInfo("nc7", NodeStatus.ALIVE, new NetworkAddress("10.0.0.7", 5099), new NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc12", new NodeControllerInfo("nc12", NodeStatus.ALIVE, new NetworkAddress("10.0.0.12", 5099), new NetworkAddress("10.0.0.5", 5098)));
+        int dataPort = 5099;
+        int resultPort = 5098;
+        int messagingPort = 5097;
+        Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(4, "nc", "10.0.0.",
+                dataPort, resultPort, messagingPort);
+        ncNameToNcInfos.put("nc7",
+                new NodeControllerInfo("nc7", NodeStatus.ALIVE, new NetworkAddress("10.0.0.7", dataPort),
+                        new NetworkAddress("10.0.0.5", resultPort), new NetworkAddress("10.0.0.5", messagingPort)));
+        ncNameToNcInfos.put("nc12",
+                new NodeControllerInfo("nc12", NodeStatus.ALIVE, new NetworkAddress("10.0.0.12", dataPort),
+                        new NetworkAddress("10.0.0.5", resultPort), new NetworkAddress("10.0.0.5", messagingPort)));
 
         InputSplit[] fileSplits = new InputSplit[12];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -115,7 +112,8 @@ public class SchedulerTest extends TestCase {
         fileSplits[5] = new FileSplit(new Path("part-6"), 0, 0, new String[] { "10.0.0.2", "10.0.0.3", "10.0.0.5" });
         fileSplits[6] = new FileSplit(new Path("part-7"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
         fileSplits[7] = new FileSplit(new Path("part-8"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.5" });
-        fileSplits[8] = new FileSplit(new Path("part-12"), 0, 0, new String[] { "10.0.0.14", "10.0.0.11", "10.0.0.13" });
+        fileSplits[8] = new FileSplit(new Path("part-12"), 0, 0,
+                new String[] { "10.0.0.14", "10.0.0.11", "10.0.0.13" });
         fileSplits[9] = new FileSplit(new Path("part-10"), 0, 0, new String[] { "10.0.0.2", "10.0.0.1", "10.0.0.6" });
         fileSplits[10] = new FileSplit(new Path("part-11"), 0, 0, new String[] { "10.0.0.3", "10.0.0.4", "10.0.0.7" });
         fileSplits[11] = new FileSplit(new Path("part-9"), 0, 0, new String[] { "10.0.0.4", "10.0.0.5", "10.0.0.6" });
@@ -123,8 +121,8 @@ public class SchedulerTest extends TestCase {
         Scheduler scheduler = new Scheduler(ncNameToNcInfos);
         String[] locationConstraints = scheduler.getLocationConstraints(fileSplits);
 
-        String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc12",
-                "nc7", "nc7", "nc12" };
+        String[] expectedResults = new String[] { "nc1", "nc4", "nc4", "nc1", "nc3", "nc2", "nc2", "nc3", "nc12", "nc7",
+                "nc7", "nc12" };
         for (int i = 0; i < locationConstraints.length; i++) {
             Assert.assertEquals(locationConstraints[i], expectedResults[i]);
         }
@@ -145,13 +143,8 @@ public class SchedulerTest extends TestCase {
      * @throws Exception
      */
     public void testSchedulerSmallerHDFS() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         InputSplit[] fileSplits = new InputSplit[12];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -191,13 +184,8 @@ public class SchedulerTest extends TestCase {
      * @throws Exception
      */
     public void testSchedulerSmallerHDFSOdd() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         InputSplit[] fileSplits = new InputSplit[13];
         fileSplits[0] = new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" });
@@ -239,13 +227,8 @@ public class SchedulerTest extends TestCase {
      * @throws Exception
      */
     public void testSchedulercBoundary() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         /** test empty file splits */
         InputSplit[] fileSplits = new InputSplit[0];

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java
index c2ad4a0..8755cf3 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs/utils/TestUtils.java
@@ -22,6 +22,12 @@ package org.apache.hyracks.hdfs.utils;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.api.client.NodeStatus;
+import org.apache.hyracks.api.comm.NetworkAddress;
 
 public class TestUtils {
 
@@ -38,8 +44,8 @@ public class TestUtils {
                     throw new Exception("Actual result changed at line " + num + ":\n< " + lineExpected + "\n> ");
                 }
                 if (!equalStrings(lineExpected, lineActual)) {
-                    throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
-                            + lineActual);
+                    throw new Exception(
+                            "Result for changed at line " + num + ":\n< " + lineExpected + "\n> " + lineActual);
                 }
                 ++num;
             }
@@ -94,4 +100,16 @@ public class TestUtils {
         return true;
     }
 
+    public static Map<String, NodeControllerInfo> generateNodeControllerInfo(int numberOfNodes, String ncNamePrefix,
+            String addressPrefix, int netPort, int dataPort, int messagingPort) {
+        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
+        for (int i = 1; i <= numberOfNodes; i++) {
+            String ncId = ncNamePrefix + i;
+            String ncAddress = addressPrefix + i;
+            ncNameToNcInfos.put(ncId,
+                    new NodeControllerInfo(ncId, NodeStatus.ALIVE, new NetworkAddress(ncAddress, netPort),
+                            new NetworkAddress(ncAddress, dataPort), new NetworkAddress(ncAddress, messagingPort)));
+        }
+        return ncNameToNcInfos;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fb7f4d32/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java
index 6eabb71..793e029 100644
--- a/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/test/java/org/apache/hyracks/hdfs2/scheduler/SchedulerTest.java
@@ -19,22 +19,18 @@
 
 package org.apache.hyracks.hdfs2.scheduler;
 
-import java.net.InetAddress;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import junit.framework.Assert;
-import junit.framework.TestCase;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
 import org.apache.hyracks.api.client.NodeControllerInfo;
-import org.apache.hyracks.api.client.NodeStatus;
-import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.hdfs.utils.TestUtils;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
 
 /**
  * Test case for the new HDFS API scheduler
@@ -47,13 +43,8 @@ public class SchedulerTest extends TestCase {
      * @throws Exception
      */
     public void testSchedulerSimple() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -79,13 +70,8 @@ public class SchedulerTest extends TestCase {
      * @throws Exception
      */
     public void testSchedulerLargerHDFS() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -118,13 +104,8 @@ public class SchedulerTest extends TestCase {
      * @throws Exception
      */
     public void testSchedulerSmallerHDFS() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));
@@ -157,13 +138,8 @@ public class SchedulerTest extends TestCase {
      * @throws Exception
      */
     public void testSchedulerSmallerHDFSOdd() throws Exception {
-        Map<String, NodeControllerInfo> ncNameToNcInfos = new HashMap<String, NodeControllerInfo>();
-        ncNameToNcInfos.put("nc1", new NodeControllerInfo("nc1", NodeStatus.ALIVE, new NetworkAddress("10.0.0.1", 5099), new NetworkAddress("10.0.0.1", 5098)));
-        ncNameToNcInfos.put("nc2", new NodeControllerInfo("nc2", NodeStatus.ALIVE, new NetworkAddress("10.0.0.2", 5099), new NetworkAddress("10.0.0.2", 5098)));
-        ncNameToNcInfos.put("nc3", new NodeControllerInfo("nc3", NodeStatus.ALIVE, new NetworkAddress("10.0.0.3", 5099), new NetworkAddress("10.0.0.3", 5098)));
-        ncNameToNcInfos.put("nc4", new NodeControllerInfo("nc4", NodeStatus.ALIVE, new NetworkAddress("10.0.0.4", 5099), new NetworkAddress("10.0.0.4", 5098)));
-        ncNameToNcInfos.put("nc5", new NodeControllerInfo("nc5", NodeStatus.ALIVE, new NetworkAddress("10.0.0.5", 5099), new NetworkAddress("10.0.0.5", 5098)));
-        ncNameToNcInfos.put("nc6", new NodeControllerInfo("nc6", NodeStatus.ALIVE, new NetworkAddress("10.0.0.6", 5099), new NetworkAddress("10.0.0.6", 5098)));
+        Map<String, NodeControllerInfo> ncNameToNcInfos = TestUtils.generateNodeControllerInfo(6, "nc", "10.0.0.", 5099,
+                5098, 5097);
 
         List<InputSplit> fileSplits = new ArrayList<InputSplit>();
         fileSplits.add(new FileSplit(new Path("part-1"), 0, 0, new String[] { "10.0.0.1", "10.0.0.2", "10.0.0.3" }));


Mime
View raw message