helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [2/4] git commit: [HELIX-470] Netty-based IPC layer
Date Thu, 28 Aug 2014 18:00:16 GMT
[HELIX-470] Netty-based IPC layer


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f2475fa9
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f2475fa9
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f2475fa9

Branch: refs/heads/master
Commit: f2475fa9a6123052fea2588cdd4e439ddc7af020
Parents: bfe5d2d
Author: Greg Brandt <brandt.greg@gmail.com>
Authored: Tue Aug 26 13:14:36 2014 -0700
Committer: Greg Brandt <brandt.greg@gmail.com>
Committed: Tue Aug 26 13:14:36 2014 -0700

----------------------------------------------------------------------
 .../helix/spectator/RoutingTableProvider.java   |  24 +-
 .../test/java/org/apache/helix/TestHelper.java  |  10 +
 helix-ipc/LICENSE                               | 273 +++++++++++
 helix-ipc/NOTICE                                |  33 ++
 helix-ipc/pom.xml                               | 141 ++++++
 helix-ipc/src/assemble/assembly.xml             |  60 +++
 helix-ipc/src/main/config/log4j.properties      |  31 ++
 .../org/apache/helix/ipc/HelixIPCCallback.java  |  32 ++
 .../helix/ipc/HelixIPCMessageManager.java       | 163 ++++++
 .../org/apache/helix/ipc/HelixIPCService.java   |  49 ++
 .../helix/ipc/netty/NettyHelixIPCService.java   | 490 +++++++++++++++++++
 .../helix/resolver/AbstractHelixResolver.java   | 295 +++++++++++
 .../org/apache/helix/resolver/HelixAddress.java |  70 +++
 .../helix/resolver/HelixMessageScope.java       | 151 ++++++
 .../apache/helix/resolver/HelixResolver.java    |  47 ++
 .../helix/resolver/ResolverRoutingTable.java    |  92 ++++
 .../helix/resolver/zk/ZKHelixResolver.java      |  45 ++
 .../helix/ipc/TestNettyHelixIPCService.java     | 353 +++++++++++++
 .../helix/ipc/benchmark/BenchmarkDriver.java    | 221 +++++++++
 .../helix/resolver/TestZKHelixResolver.java     | 161 ++++++
 helix-ipc/src/test/resources/build_benchmark.sh |  29 ++
 helix-ipc/src/test/resources/run_benchmark.sh   |  37 ++
 pom.xml                                         |   1 +
 23 files changed, 2806 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 7799ca1..ccce64a 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -38,7 +38,8 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.log4j.Logger;
 
-public class RoutingTableProvider implements ExternalViewChangeListener, InstanceConfigChangeListener {
+public class RoutingTableProvider implements ExternalViewChangeListener,
+    InstanceConfigChangeListener {
   private static final Logger logger = Logger.getLogger(RoutingTableProvider.class);
   private final AtomicReference<RoutingTable> _routingTableRef;
 
@@ -91,6 +92,15 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
     return instanceSet;
   }
 
+  /**
+   * Get the configuration of an instance from its name
+   * @param instanceName the instance ID
+   * @return InstanceConfig if present, null otherwise
+   */
+  public InstanceConfig getInstanceConfig(String instanceName) {
+    return _routingTableRef.get().getConfig(instanceName);
+  }
+
   @Override
   public void onExternalViewChange(List<ExternalView> externalViewList,
       NotificationContext changeContext) {
@@ -124,12 +134,13 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
     HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
     Builder keyBuilder = accessor.keyBuilder();
 
+    RoutingTable newRoutingTable = new RoutingTable();
     List<InstanceConfig> configList = accessor.getChildValues(keyBuilder.instanceConfigs());
     Map<String, InstanceConfig> instanceConfigMap = new HashMap<String, InstanceConfig>();
     for (InstanceConfig config : configList) {
       instanceConfigMap.put(config.getId(), config);
+      newRoutingTable.addConfig(config);
     }
-    RoutingTable newRoutingTable = new RoutingTable();
     if (externalViewList != null) {
       for (ExternalView extView : externalViewList) {
         String resourceName = extView.getId();
@@ -154,9 +165,11 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
 
   class RoutingTable {
     private final HashMap<String, ResourceInfo> resourceInfoMap;
+    private final Map<String, InstanceConfig> instanceConfigMap;
 
     public RoutingTable() {
       resourceInfoMap = new HashMap<String, RoutingTableProvider.ResourceInfo>();
+      instanceConfigMap = new HashMap<String, InstanceConfig>();
     }
 
     public void addEntry(String resourceName, String partitionName, String state,
@@ -169,10 +182,17 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
 
     }
 
+    public void addConfig(InstanceConfig config) {
+      instanceConfigMap.put(config.getInstanceName(), config);
+    }
+
     ResourceInfo get(String resourceName) {
       return resourceInfoMap.get(resourceName);
     }
 
+    InstanceConfig getConfig(String instanceName) {
+      return instanceConfigMap.get(instanceName);
+    }
   }
 
   class ResourceInfo {

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-core/src/test/java/org/apache/helix/TestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java
index 4a9139f..64796ba 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -20,7 +20,9 @@ package org.apache.helix;
  */
 
 import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Method;
+import java.net.ServerSocket;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -769,4 +771,12 @@ public class TestHelper {
     System.out.println(sb.toString());
   }
 
+  public static int getRandomPort() throws IOException {
+    ServerSocket sock = new ServerSocket();
+    sock.bind(null);
+    int port = sock.getLocalPort();
+    sock.close();
+    return port;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/LICENSE
----------------------------------------------------------------------
diff --git a/helix-ipc/LICENSE b/helix-ipc/LICENSE
new file mode 100644
index 0000000..413913f
--- /dev/null
+++ b/helix-ipc/LICENSE
@@ -0,0 +1,273 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+
+
+For xstream:
+
+Copyright (c) 2003-2006, Joe Walnes
+Copyright (c) 2006-2009, 2011 XStream Committers
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this list of
+conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice, this list of
+conditions and the following disclaimer in the documentation and/or other materials provided
+with the distribution.
+
+3. Neither the name of XStream nor the names of its contributors may be used to endorse
+or promote products derived from this software without specific prior written
+permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
+EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
+WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+DAMAGE.
+
+for jline:
+
+Copyright (c) 2002-2006, Marc Prud'hommeaux <mwp1@cornell.edu>
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or
+without modification, are permitted provided that the following
+conditions are met:
+
+Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with
+the distribution.
+
+Neither the name of JLine nor the names of its contributors
+may be used to endorse or promote products derived from this
+software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
+IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
+

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/NOTICE
----------------------------------------------------------------------
diff --git a/helix-ipc/NOTICE b/helix-ipc/NOTICE
new file mode 100644
index 0000000..1ee0d24
--- /dev/null
+++ b/helix-ipc/NOTICE
@@ -0,0 +1,33 @@
+Apache Helix
+Copyright 2014 The Apache Software Foundation
+
+
+I. Included Software
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+Licensed under the Apache License 2.0.
+
+This product includes software developed at
+Codehaus (http://www.codehaus.org/).
+Licensed under the BSD License.
+
+This product includes software developed at
+jline (http://jline.sourceforge.net/).
+Licensed under the BSD License.
+
+This product includes software developed at
+Google (http://www.google.com/).
+Licensed under the Apache License 2.0.
+
+This product includes software developed at
+snakeyaml (http://www.snakeyaml.org/).
+Licensed under the Apache License 2.0.
+
+This product includes software developed at
+zkclient (https://github.com/sgroschupf/zkclient).
+Licensed under the Apache License 2.0.
+
+II. License Summary
+- Apache License 2.0
+- BSD License

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/pom.xml
----------------------------------------------------------------------
diff --git a/helix-ipc/pom.xml b/helix-ipc/pom.xml
new file mode 100644
index 0000000..90ab11d
--- /dev/null
+++ b/helix-ipc/pom.xml
@@ -0,0 +1,141 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <groupId>org.apache.helix</groupId>
+    <artifactId>helix</artifactId>
+    <version>0.7.1-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>helix-ipc</artifactId>
+  <packaging>bundle</packaging>
+
+  <name>Apache Helix :: IPC</name>
+
+  <properties>
+    <osgi.import>
+      javax.management*,
+      org.apache.commons.math*;version="[2.1,3)",
+      org.apache.log4j*;version="[1.2,2)",
+      org.restlet;version="[2.1.4,3]",
+      *
+    </osgi.import>
+    <osgi.ignore>
+      org.apache.helix.tools*
+    </osgi.ignore>
+    <osgi.export>org.apache.helix*;version="${project.version};-noimport:=true</osgi.export>
+  </properties>
+
+  <dependencies>
+      <dependency>
+          <groupId>org.apache.helix</groupId>
+          <artifactId>helix-core</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-all</artifactId>
+          <version>4.0.21.Final</version>
+      </dependency>
+      <dependency>
+          <groupId>com.codahale.metrics</groupId>
+          <artifactId>metrics-core</artifactId>
+          <version>3.0.1</version>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.helix</groupId>
+          <artifactId>helix-core</artifactId>
+          <type>test-jar</type>
+          <scope>test</scope>
+      </dependency>
+      <dependency>
+          <groupId>org.testng</groupId>
+          <artifactId>testng</artifactId>
+          <scope>test</scope>
+          <exclusions>
+              <exclusion>
+                  <groupId>junit</groupId>
+                  <artifactId>junit</artifactId>
+              </exclusion>
+          </exclusions>
+      </dependency>
+  </dependencies>
+  <build>
+    <resources>
+      <resource>
+        <directory>${basedir}/src/main/resources</directory>
+        <filtering>true</filtering>
+      </resource>
+      <resource>
+        <directory>${basedir}</directory>
+        <includes>
+          <include>DISCLAIMER</include>
+        </includes>
+      </resource>
+    </resources>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>appassembler-maven-plugin</artifactId>
+        <configuration>
+          <programs>
+            <program>
+              <mainClass>org.apache.helix.ipc.netty.NettyHelixIPCService</mainClass>
+              <name>netty-helix-ipc-service</name>
+            </program>
+          </programs>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <!--
+          <descriptors>
+            <descriptor>src/assemble/assembly.xml</descriptor>
+          </descriptors>
+          -->
+          <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+          </descriptorRefs>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/assemble/assembly.xml
----------------------------------------------------------------------
diff --git a/helix-ipc/src/assemble/assembly.xml b/helix-ipc/src/assemble/assembly.xml
new file mode 100644
index 0000000..c2d08a1
--- /dev/null
+++ b/helix-ipc/src/assemble/assembly.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly>
+  <id>pkg</id>
+  <formats>
+    <format>tar</format>
+  </formats>
+  <fileSets>
+    <fileSet>
+      <directory>${project.build.directory}/${project.artifactId}-pkg/bin</directory>
+      <outputDirectory>bin</outputDirectory>
+      <lineEnding>unix</lineEnding>
+      <fileMode>0755</fileMode>
+      <directoryMode>0755</directoryMode>
+    </fileSet>
+    <fileSet>
+      <directory>${project.build.directory}/${project.artifactId}-pkg/repo/</directory>
+      <outputDirectory>repo</outputDirectory>
+      <fileMode>0755</fileMode>
+      <directoryMode>0755</directoryMode>
+      <excludes>
+        <exclude>**/*.xml</exclude>
+      </excludes>
+    </fileSet>
+     <fileSet>
+      <directory>${project.build.directory}/${project.artifactId}-pkg/conf</directory>
+      <outputDirectory>conf</outputDirectory>
+      <lineEnding>unix</lineEnding>
+      <fileMode>0755</fileMode>
+      <directoryMode>0755</directoryMode>
+    </fileSet>
+    <fileSet>
+      <directory>${project.basedir}</directory>
+      <outputDirectory>/</outputDirectory>
+      <includes>
+        <include>LICENSE</include>
+        <include>NOTICE</include>
+        <include>DISCLAIMER</include>
+      </includes>
+      <fileMode>0755</fileMode>
+    </fileSet>
+  </fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/config/log4j.properties
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/config/log4j.properties b/helix-ipc/src/main/config/log4j.properties
new file mode 100644
index 0000000..4b3dc31
--- /dev/null
+++ b/helix-ipc/src/main/config/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=ERROR,A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.logger.org.I0Itec=ERROR
+log4j.logger.org.apache=ERROR

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCCallback.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCCallback.java b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCCallback.java
new file mode 100644
index 0000000..9ea23ca
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCCallback.java
@@ -0,0 +1,32 @@
+package org.apache.helix.ipc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.buffer.ByteBuf;
+import org.apache.helix.resolver.HelixMessageScope;
+
+import java.util.UUID;
+
+/**
+ * Callback registered per message type to handle messages.
+ */
+public interface HelixIPCCallback {
+  void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message);
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCMessageManager.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCMessageManager.java b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCMessageManager.java
new file mode 100644
index 0000000..89b10eb
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCMessageManager.java
@@ -0,0 +1,163 @@
+package org.apache.helix.ipc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.buffer.ByteBuf;
+import org.apache.helix.resolver.HelixAddress;
+import org.apache.helix.resolver.HelixMessageScope;
+import org.apache.log4j.Logger;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A wrapper around a base IPC service that manages message retries / timeouts.
+ * <p>
+ * This class manages retries and timeouts, and can be used in the same way as a
+ * {@link org.apache.helix.ipc.HelixIPCService}.
+ * </p>
+ * <p>
+ * A message will be sent until the max number of retries has been reached (i.e. timed out), or it
+ * is acknowledged by the recipient. If the max number of retries is -1, it will be retried forever.
+ * </p>
+ * <p>
+ * A callback should be registered for every acknowledgement message type associated with any
+ * original message type sent by this class.
+ * </p>
+ * <p>
+ * For example, consider we have the two message types defined: DATA_REQ = 1, DATA_ACK = 2. One
+ * would do the following:
+ * 
+ * <pre>
+ * messageManager.registerCallback(DATA_ACK, new HelixIPCCallback() {
+ *   public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+ *     // Process ACK
+ *   }
+ * 
+ *   public void onError(HelixMessageScope scope, UUID messageId, Throwable cause) {
+ *     // Message error or timeout
+ *   }
+ * });
+ * 
+ * messageManager.send(destinations, DATA_REQ, messageId, data);
+ * </pre>
+ * 
+ * </p>
+ * <p>
+ * In send, we note messageId, and retry until we get a DATA_ACK for the same messageId. The
+ * callback registered with the message manager will only be called once, even if the message is
+ * acknowledged several times.
+ * </p>
+ */
+public class HelixIPCMessageManager implements HelixIPCService {
+
+  private static final Logger LOG = Logger.getLogger(HelixIPCMessageManager.class);
+
+  private final ScheduledExecutorService scheduler;
+  private final HelixIPCService baseIpcService;
+  private final long messageTimeoutMillis;
+  private final int maxNumRetries;
+  private final ConcurrentMap<UUID, Boolean> pendingMessages;
+  private final ConcurrentMap<UUID, AtomicInteger> retriesLeft;
+  private final ConcurrentMap<UUID, ByteBuf> messageBuffers;
+  private final ConcurrentMap<Integer, HelixIPCCallback> callbacks;
+
+  public HelixIPCMessageManager(ScheduledExecutorService scheduler, HelixIPCService baseIpcService,
+      long messageTimeoutMillis, int maxNumRetries) {
+    this.scheduler = scheduler;
+    this.baseIpcService = baseIpcService;
+    this.maxNumRetries = maxNumRetries;
+    this.messageTimeoutMillis = messageTimeoutMillis;
+    this.pendingMessages = new ConcurrentHashMap<UUID, Boolean>();
+    this.retriesLeft = new ConcurrentHashMap<UUID, AtomicInteger>();
+    this.messageBuffers = new ConcurrentHashMap<UUID, ByteBuf>();
+    this.callbacks = new ConcurrentHashMap<Integer, HelixIPCCallback>();
+  }
+
+  @Override
+  public void start() throws Exception {
+    baseIpcService.start();
+  }
+
+  @Override
+  public void shutdown() throws Exception {
+    baseIpcService.shutdown();
+  }
+
+  @Override
+  public void send(final HelixAddress destination, final int messageType, final UUID messageId,
+      final ByteBuf message) {
+    // State
+    pendingMessages.put(messageId, true);
+    retriesLeft.putIfAbsent(messageId, new AtomicInteger(maxNumRetries));
+    messageBuffers.put(messageId, message);
+
+    // Will free it when we've finally received response
+    message.retain();
+
+    // Send initial message
+    baseIpcService.send(destination, messageType, messageId, message);
+
+    // Retries
+    scheduler.schedule(new Runnable() {
+      @Override
+      public void run() {
+        Boolean isPending = pendingMessages.get(messageId);
+        AtomicInteger numLeft = retriesLeft.get(messageId);
+        if (numLeft != null && isPending != null && isPending) {
+          if (numLeft.decrementAndGet() > 0 || maxNumRetries == -1) {
+            // n.b. will schedule another retry
+            send(destination, messageType, messageId, message);
+          } else {
+            LOG.warn("Message " + messageId + " timed out after " + maxNumRetries + " retries");
+          }
+        }
+      }
+    }, messageTimeoutMillis, TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public void registerCallback(final int messageType, final HelixIPCCallback callback) {
+
+    // This callback will first check if the message is pending, then delegate to the provided
+    // callback if it has not yet done so.
+    HelixIPCCallback wrappedCallback = new HelixIPCCallback() {
+      @Override
+      public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+        if (pendingMessages.replace(messageId, true, false)) {
+          pendingMessages.remove(messageId);
+          ByteBuf originalMessage = messageBuffers.remove(messageId);
+          if (originalMessage != null) {
+            originalMessage.release();
+          }
+          retriesLeft.remove(messageId);
+          callback.onMessage(scope, messageId, message);
+        }
+      }
+    };
+
+    callbacks.put(messageType, wrappedCallback);
+    baseIpcService.registerCallback(messageType, wrappedCallback);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCService.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCService.java b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCService.java
new file mode 100644
index 0000000..6158514
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCService.java
@@ -0,0 +1,49 @@
+package org.apache.helix.ipc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.buffer.ByteBuf;
+import org.apache.helix.resolver.HelixAddress;
+
+import java.util.UUID;
+
+/**
+ * Allows message passing among instances in Helix clusters.
+ * <p>
+ * Messages are sent asynchronously using {@link #send}, and handled by callbacks registered via
+ * {@link #registerCallback}
+ * </p>
+ */
+public interface HelixIPCService {
+
+  static final String IPC_PORT = "IPC_PORT";
+
+  /** Starts service (must call before {@link #send}) */
+  void start() throws Exception;
+
+  /** Shuts down service and releases any resources */
+  void shutdown() throws Exception;
+
+  /** Sends a message to one or more instances that map to a cluster scope. */
+  void send(HelixAddress destination, int messageType, UUID messageId, ByteBuf message);
+
+  /** Registers a callback for a given message type */
+  void registerCallback(int messageType, HelixIPCCallback callback);
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java
new file mode 100644
index 0000000..00d6157
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java
@@ -0,0 +1,490 @@
+package org.apache.helix.ipc.netty;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.ipc.HelixIPCCallback;
+import org.apache.helix.ipc.HelixIPCService;
+import org.apache.helix.resolver.HelixAddress;
+import org.apache.helix.resolver.HelixMessageScope;
+import org.apache.log4j.Logger;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * Provides partition/state-level messaging among nodes in a Helix cluster.
+ * <p>
+ * The message format is (where len == 4B, and contains the length of the next field)
+ * 
+ * <pre>
+ *      +----------------------+
+ *      | totalLength (4B)     |
+ *      +----------------------+
+ *      | version (4B)         |
+ *      +----------------------+
+ *      | messageType (4B)     |
+ *      +----------------------+
+ *      | messageId (16B)      |
+ *      +----------------------+
+ *      | len | cluster        |
+ *      +----------------------+
+ *      | len | resource       |
+ *      +----------------------+
+ *      | len | partition      |
+ *      +----------------------+
+ *      | len | state          |
+ *      +----------------------+
+ *      | len | srcInstance    |
+ *      +----------------------+
+ *      | len | dstInstance    |
+ *      +----------------------+
+ *      | len | message        |
+ *      +----------------------+
+ * </pre>
+ * 
+ * </p>
+ */
+public class NettyHelixIPCService implements HelixIPCService {
+
+  private static final Logger LOG = Logger.getLogger(NettyHelixIPCService.class);
+  private static final int MESSAGE_VERSION = 1;
+
+  // Parameters for length header field of message (tells decoder to interpret but preserve length
+  // field in message)
+  private static final int MAX_FRAME_LENGTH = 1024 * 1024;
+  private static final int LENGTH_FIELD_OFFSET = 0;
+  private static final int LENGTH_FIELD_LENGTH = 4;
+  private static final int LENGTH_ADJUSTMENT = -4;
+  private static final int INITIAL_BYTES_TO_STRIP = 0;
+  private static final int NUM_LENGTH_FIELDS = 7;
+
+  private final Config config;
+  private final AtomicBoolean isShutdown;
+  private final Map<InetSocketAddress, List<Channel>> channelMap;
+  private final ConcurrentMap<Channel, Long> channelOpenTimes;
+  private final MetricRegistry metricRegistry;
+  private final ConcurrentMap<Integer, HelixIPCCallback> callbacks;
+
+  private EventLoopGroup eventLoopGroup;
+  private Bootstrap clientBootstrap;
+  private Meter statTxMsg;
+  private Meter statRxMsg;
+  private Meter statTxBytes;
+  private Meter statRxBytes;
+  private Counter statChannelOpen;
+  private Counter statError;
+  private JmxReporter jmxReporter;
+
+  public NettyHelixIPCService(Config config) {
+    super();
+    this.config = config;
+    this.isShutdown = new AtomicBoolean(true);
+    this.channelMap = new HashMap<InetSocketAddress, List<Channel>>();
+    this.channelOpenTimes = new ConcurrentHashMap<Channel, Long>();
+    this.metricRegistry = new MetricRegistry();
+    this.callbacks = new ConcurrentHashMap<Integer, HelixIPCCallback>();
+  }
+
+  /**
+   * Starts message handling server, creates client bootstrap, and bootstraps partition routing
+   * table.
+   */
+  public void start() throws Exception {
+    if (isShutdown.getAndSet(false)) {
+      eventLoopGroup = new NioEventLoopGroup();
+
+      statTxMsg = metricRegistry.meter(MetricRegistry.name(NettyHelixIPCService.class, "txMsg"));
+      statRxMsg = metricRegistry.meter(MetricRegistry.name(NettyHelixIPCService.class, "rxMsg"));
+      statTxBytes =
+          metricRegistry.meter(MetricRegistry.name(NettyHelixIPCService.class, "txBytes"));
+      statRxBytes =
+          metricRegistry.meter(MetricRegistry.name(NettyHelixIPCService.class, "rxBytes"));
+      statChannelOpen =
+          metricRegistry.counter(MetricRegistry.name(NettyHelixIPCService.class, "channelOpen"));
+      statError = metricRegistry.counter(MetricRegistry.name(NettyHelixIPCService.class, "error"));
+
+      // Report metrics via JMX
+      jmxReporter = JmxReporter.forRegistry(metricRegistry).build();
+      jmxReporter.start();
+
+      new ServerBootstrap().group(eventLoopGroup).channel(NioServerSocketChannel.class)
+          .option(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.SO_KEEPALIVE, true)
+          .childHandler(new ChannelInitializer<SocketChannel>() {
+            @Override
+            protected void initChannel(SocketChannel socketChannel) throws Exception {
+              socketChannel.pipeline().addLast(
+                  new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET,
+                      LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP));
+              socketChannel.pipeline().addLast(new HelixIPCCallbackHandler());
+            }
+          }).bind(new InetSocketAddress(config.getPort()));
+
+      clientBootstrap =
+          new Bootstrap().group(eventLoopGroup).channel(NioSocketChannel.class)
+              .option(ChannelOption.SO_KEEPALIVE, true)
+              .handler(new ChannelInitializer<SocketChannel>() {
+                @Override
+                protected void initChannel(SocketChannel socketChannel) throws Exception {
+                  socketChannel.pipeline().addLast(
+                      new LengthFieldPrepender(LENGTH_FIELD_LENGTH, true));
+                }
+              });
+    }
+  }
+
+  /**
+   * Shuts down event loops for message handling server and message passing client.
+   */
+  public void shutdown() throws Exception {
+    if (!isShutdown.getAndSet(true)) {
+      jmxReporter.stop();
+      eventLoopGroup.shutdownGracefully();
+    }
+  }
+
+  /**
+   * Sends a message to all partitions with a given state in the cluster.
+   */
+  @Override
+  public void send(HelixAddress destination, int messageType, UUID messageId, ByteBuf message) {
+    // Send message
+    try {
+      // Get list of channels
+      List<Channel> channels = channelMap.get(destination.getSocketAddress());
+      if (channels == null) {
+        synchronized (channelMap) {
+          channels = channelMap.get(destination.getSocketAddress());
+          if (channels == null) {
+            channels = new ArrayList<Channel>(config.getNumConnections());
+            for (int i = 0; i < config.getNumConnections(); i++) {
+              channels.add(null);
+            }
+            channelMap.put(destination.getSocketAddress(), channels);
+          }
+        }
+      }
+
+      // Pick the channel for this scope
+      int idx = (Integer.MAX_VALUE & destination.getScope().hashCode()) % channels.size();
+      Channel channel = channels.get(idx);
+      if (channel == null || !channel.isOpen() || isExpired(channel)) {
+        synchronized (channelMap) {
+          channel = channels.get(idx);
+          if (channel == null || !channel.isOpen() || isExpired(channel)) {
+            if (channel != null && channel.isOpen()) {
+              channel.close();
+            }
+            channel = clientBootstrap.connect(destination.getSocketAddress()).sync().channel();
+            channels.set(idx, channel);
+            statChannelOpen.inc();
+            channelOpenTimes.put(channel, System.currentTimeMillis());
+          }
+        }
+      }
+
+      // Compute total length
+      int headerLength =
+          NUM_LENGTH_FIELDS
+              * (Integer.SIZE / 8)
+              + (Integer.SIZE / 8)
+              * 2 // version, type
+              + (Long.SIZE / 8)
+              * 2 // 128 bit UUID
+              + getLength(destination.getScope().getCluster())
+              + getLength(destination.getScope().getResource())
+              + getLength(destination.getScope().getPartition())
+              + getLength(destination.getScope().getState()) + getLength(config.getInstanceName())
+              + getLength(destination.getInstanceName());
+      int messageLength = message == null ? 0 : message.readableBytes();
+
+      // Build message header
+      ByteBuf headerBuf = channel.alloc().buffer(headerLength);
+      headerBuf.writeInt(MESSAGE_VERSION).writeInt(messageType)
+          .writeLong(messageId.getMostSignificantBits())
+          .writeLong(messageId.getLeastSignificantBits());
+      writeStringWithLength(headerBuf, destination.getScope().getCluster());
+      writeStringWithLength(headerBuf, destination.getScope().getResource());
+      writeStringWithLength(headerBuf, destination.getScope().getPartition());
+      writeStringWithLength(headerBuf, destination.getScope().getState());
+      writeStringWithLength(headerBuf, config.getInstanceName());
+      writeStringWithLength(headerBuf, destination.getInstanceName());
+
+      // Compose message header and payload
+      headerBuf.writeInt(messageLength);
+      CompositeByteBuf fullByteBuf = channel.alloc().compositeBuffer(2);
+      fullByteBuf.addComponent(headerBuf);
+      fullByteBuf.writerIndex(headerBuf.readableBytes());
+      if (message != null) {
+        fullByteBuf.addComponent(message);
+        fullByteBuf.writerIndex(fullByteBuf.writerIndex() + message.readableBytes());
+      }
+
+      // Send
+      statTxMsg.mark();
+      statTxBytes.mark(fullByteBuf.readableBytes());
+      channel.writeAndFlush(fullByteBuf);
+    } catch (Exception e) {
+      statError.inc();
+      throw new IllegalStateException("Could not send message to " + destination, e);
+    }
+  }
+
+  private boolean isExpired(Channel channel) {
+    Long channelOpenTime = channelOpenTimes.get(channel);
+    return channelOpenTime != null
+        && System.currentTimeMillis() - channelOpenTime >= config.getMaxChannelLifeMillis();
+  }
+
+  @Override
+  public void registerCallback(int messageType, HelixIPCCallback callback) {
+    callbacks.put(messageType, callback);
+  }
+
+  @ChannelHandler.Sharable
+  private class HelixIPCCallbackHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+    HelixIPCCallbackHandler() {
+      super(false); // we will manage reference
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
+      try {
+        int idx = 0;
+
+        // Message length
+        int messageLength = byteBuf.readInt();
+        idx += 4;
+
+        // Message version
+        @SuppressWarnings("unused")
+        int messageVersion = byteBuf.readInt();
+        idx += 4;
+
+        // Message type
+        int messageType = byteBuf.readInt();
+        idx += 4;
+
+        // Message ID
+        UUID messageId = new UUID(byteBuf.readLong(), byteBuf.readLong());
+        idx += 16;
+
+        // Cluster
+        byteBuf.readerIndex(idx);
+        int clusterSize = byteBuf.readInt();
+        idx += 4;
+        checkLength("clusterSize", clusterSize, messageLength);
+        String clusterName = toNonEmptyString(clusterSize, byteBuf);
+        idx += clusterSize;
+
+        // Resource
+        byteBuf.readerIndex(idx);
+        int resourceSize = byteBuf.readInt();
+        idx += 4;
+        checkLength("resourceSize", resourceSize, messageLength);
+        String resourceName = toNonEmptyString(resourceSize, byteBuf);
+        idx += resourceSize;
+
+        // Partition
+        byteBuf.readerIndex(idx);
+        int partitionSize = byteBuf.readInt();
+        idx += 4;
+        checkLength("partitionSize", partitionSize, messageLength);
+        String partitionName = toNonEmptyString(partitionSize, byteBuf);
+        idx += partitionSize;
+
+        // State
+        byteBuf.readerIndex(idx);
+        int stateSize = byteBuf.readInt();
+        idx += 4;
+        checkLength("stateSize", stateSize, messageLength);
+        String state = toNonEmptyString(stateSize, byteBuf);
+        idx += stateSize;
+
+        // Source instance
+        byteBuf.readerIndex(idx);
+        int srcInstanceSize = byteBuf.readInt();
+        idx += 4;
+        checkLength("srcInstanceSize", srcInstanceSize, messageLength);
+        String srcInstance = toNonEmptyString(srcInstanceSize, byteBuf);
+        idx += srcInstanceSize;
+
+        // Destination instance
+        byteBuf.readerIndex(idx);
+        int dstInstanceSize = byteBuf.readInt();
+        idx += 4;
+        checkLength("dstInstanceSize", dstInstanceSize, messageLength);
+        String dstInstance = toNonEmptyString(dstInstanceSize, byteBuf);
+        idx += dstInstanceSize;
+
+        // Position at message
+        byteBuf.readerIndex(idx + 4);
+
+        // Error check
+        if (dstInstance == null) {
+          throw new IllegalStateException("Received message addressed to null destination from "
+              + srcInstance);
+        } else if (!dstInstance.equals(config.getInstanceName())) {
+          throw new IllegalStateException(config.getInstanceName()
+              + " received message addressed to " + dstInstance + " from " + srcInstance);
+        } else if (callbacks.get(messageType) == null) {
+          throw new IllegalStateException("No callback registered for message type " + messageType);
+        }
+
+        // Build scope
+        HelixMessageScope scope =
+            new HelixMessageScope.Builder().cluster(clusterName).resource(resourceName)
+                .partition(partitionName).state(state).sourceInstance(srcInstance).build();
+
+        // Get callback
+        HelixIPCCallback callback = callbacks.get(messageType);
+        if (callback == null) {
+          throw new IllegalStateException("No callback registered for message type " + messageType);
+        }
+
+        // Handle callback
+        callback.onMessage(scope, messageId, byteBuf);
+
+        // Stats
+        statRxMsg.mark();
+        statRxBytes.mark(messageLength);
+      } finally {
+        byteBuf.release();
+      }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable cause) {
+      LOG.error(cause);
+    }
+  }
+
+  /** Given a byte buf w/ a certain reader index, encodes the next length bytes as a String */
+  private static String toNonEmptyString(int length, ByteBuf byteBuf) {
+    if (byteBuf.readableBytes() >= length) {
+      return byteBuf.toString(byteBuf.readerIndex(), length, Charset.defaultCharset());
+    }
+    return null;
+  }
+
+  /** Writes [s.length(), s] to buf, or [0] if s is null */
+  private static void writeStringWithLength(ByteBuf buf, String s) {
+    if (s == null) {
+      buf.writeInt(0);
+      return;
+    }
+
+    buf.writeInt(s.length());
+    for (int i = 0; i < s.length(); i++) {
+      buf.writeByte(s.charAt(i));
+    }
+  }
+
+  /** Returns the length of a string, or 0 if s is null */
+  private static int getLength(String s) {
+    return s == null ? 0 : s.length();
+  }
+
+  /**
+   * @throws java.lang.IllegalArgumentException if length > messageLength (attempt to prevent OOM
+   *           exceptions)
+   */
+  private static void checkLength(String fieldName, int length, int messageLength)
+      throws IllegalArgumentException {
+    if (length > messageLength) {
+      throw new IllegalArgumentException(fieldName + "=" + length
+          + " is greater than messageLength=" + messageLength);
+    }
+  }
+
+  public static class Config {
+    private String instanceName;
+    private int port;
+    private int numConnections = 1;
+    private long maxChannelLifeMillis = 5000;
+
+    public Config setInstanceName(String instanceName) {
+      this.instanceName = instanceName;
+      return this;
+    }
+
+    public Config setPort(int port) {
+      this.port = port;
+      return this;
+    }
+
+    public Config setNumConnections(int numConnections) {
+      this.numConnections = numConnections;
+      return this;
+    }
+
+    public Config setMaxChannelLifeMillis(long maxChannelLifeMillis) {
+      this.maxChannelLifeMillis = maxChannelLifeMillis;
+      return this;
+    }
+
+    public String getInstanceName() {
+      return instanceName;
+    }
+
+    public int getPort() {
+      return port;
+    }
+
+    public int getNumConnections() {
+      return numConnections;
+    }
+
+    public long getMaxChannelLifeMillis() {
+      return maxChannelLifeMillis;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/AbstractHelixResolver.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/AbstractHelixResolver.java b/helix-ipc/src/main/java/org/apache/helix/resolver/AbstractHelixResolver.java
new file mode 100644
index 0000000..c0fd2bb
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/AbstractHelixResolver.java
@@ -0,0 +1,295 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * A basic implementation of a resolver in terms of expiring routing tables
+ */
+public abstract class AbstractHelixResolver implements HelixResolver {
+  private static final Logger LOG = Logger.getLogger(AbstractHelixResolver.class);
+  private static final int DEFAULT_THREAD_POOL_SIZE = 10;
+  private static final long DEFAULT_LEASE_LENGTH_MS = 60 * 60 * 1000; // TODO: are these good
+                                                                      // values?
+  private static final String IPC_PORT = "IPC_PORT";
+  private final Map<String, Spectator> _connections;
+  private boolean _isConnected;
+  private ScheduledExecutorService _executor;
+
+  protected AbstractHelixResolver() {
+    _connections = Maps.newHashMap();
+    _isConnected = false;
+  }
+
+  @Override
+  public void connect() {
+    _executor = Executors.newScheduledThreadPool(DEFAULT_THREAD_POOL_SIZE);
+    _isConnected = true;
+  }
+
+  @Override
+  public void disconnect() {
+    synchronized (_connections) {
+      for (Spectator connection : _connections.values()) {
+        connection.shutdown();
+      }
+      _connections.clear();
+    }
+    _executor.shutdown();
+    _isConnected = false;
+  }
+
+  @Override
+  public Set<HelixAddress> getDestinations(HelixMessageScope scope) {
+    if (!scope.isValid()) {
+      LOG.error("Scope " + scope + " is not valid!");
+      return new HashSet<HelixAddress>();
+    } else if (!_isConnected) {
+      LOG.error("Cannot resolve " + scope + " without first connecting!");
+      return new HashSet<HelixAddress>();
+    }
+
+    // Connect or refresh connection
+    String cluster = scope.getCluster();
+    ResolverRoutingTable routingTable;
+    Spectator connection = _connections.get(cluster);
+    if (connection == null || !connection.getManager().isConnected()) {
+      synchronized (_connections) {
+        connection = _connections.get(cluster);
+        if (connection == null || !connection.getManager().isConnected()) {
+          connection = new Spectator(cluster, DEFAULT_LEASE_LENGTH_MS);
+          connection.init();
+          _connections.put(cluster, connection);
+        }
+      }
+    }
+    routingTable = connection.getRoutingTable();
+
+    // Resolve all resources, either explicitly or match all
+    Set<String> resources;
+    if (scope.getResource() != null) {
+      resources = Sets.newHashSet(scope.getResource());
+    } else {
+      resources = routingTable.getResources();
+    }
+
+    // Resolve all partitions
+    Map<String, Set<String>> partitionMap = Maps.newHashMap();
+    if (scope.getPartition() != null) {
+      for (String resource : resources) {
+        partitionMap.put(resource, Sets.newHashSet(scope.getPartition()));
+      }
+    } else {
+      for (String resource : resources) {
+        partitionMap.put(resource, routingTable.getPartitions(resource));
+      }
+    }
+
+    // Resolve all states
+    Set<String> states;
+    if (scope.getState() != null) {
+      states = Sets.newHashSet(scope.getState());
+    } else {
+      states = routingTable.getStates();
+    }
+
+    // Get all the participants that match
+    Set<InstanceConfig> participants = Sets.newHashSet();
+    for (String resource : resources) {
+      for (String partition : partitionMap.get(resource)) {
+        for (String state : states) {
+          participants.addAll(routingTable.getInstances(resource, partition, state));
+        }
+      }
+    }
+
+    // Resolve those participants
+    Set<HelixAddress> result = new HashSet<HelixAddress>();
+    for (InstanceConfig participant : participants) {
+      String ipcPort = participant.getRecord().getSimpleField(IPC_PORT);
+      if (ipcPort == null) {
+        LOG.error("No ipc address registered for target instance " + participant.getInstanceName()
+            + ", skipping");
+      } else {
+        result.add(new HelixAddress(scope, participant.getInstanceName(), new InetSocketAddress(
+            participant.getHostName(), Integer.valueOf(ipcPort))));
+      }
+    }
+
+    return result;
+  }
+
+  @Override
+  public HelixAddress getSource(HelixMessageScope scope) {
+    // Connect or refresh connection
+    String cluster = scope.getCluster();
+    ResolverRoutingTable routingTable;
+    Spectator connection = _connections.get(cluster);
+    if (connection == null || !connection.getManager().isConnected()) {
+      synchronized (_connections) {
+        connection = _connections.get(cluster);
+        if (connection == null || !connection.getManager().isConnected()) {
+          connection = new Spectator(cluster, DEFAULT_LEASE_LENGTH_MS);
+          connection.init();
+          _connections.put(cluster, connection);
+        }
+      }
+    }
+    routingTable = connection.getRoutingTable();
+
+    if (scope.getSourceInstance() != null) {
+      InstanceConfig config = routingTable.getInstanceConfig(scope.getSourceInstance());
+      String ipcPort = config.getRecord().getSimpleField(IPC_PORT);
+      if (ipcPort == null) {
+        throw new IllegalStateException("No IPC address registered for source instance "
+            + scope.getSourceInstance());
+      }
+      return new HelixAddress(scope, scope.getSourceInstance(), new InetSocketAddress(
+          config.getHostName(), Integer.valueOf(ipcPort)));
+    }
+
+    return null;
+  }
+
+  @Override
+  public boolean isConnected() {
+    return _isConnected;
+  }
+
+  /**
+   * Create a Helix manager connection based on the appropriate backing store
+   * @param cluster the name of the cluster to connect to
+   * @return HelixManager instance
+   */
+  protected abstract HelixManager createManager(String cluster);
+
+  private class Spectator {
+    private final String _cluster;
+    private final HelixManager _manager;
+    private final ResolverRoutingTable _routingTable;
+    private final long _leaseLengthMs;
+    private ScheduledFuture<?> _future;
+
+    /**
+     * Initialize a spectator. This does not automatically connect.
+     * @param cluster the cluster to spectate
+     * @param leaseLengthMs the expiry of this spectator after the last request
+     */
+    public Spectator(String cluster, long leaseLengthMs) {
+      _cluster = cluster;
+      _manager = createManager(cluster);
+      _leaseLengthMs = leaseLengthMs;
+      _routingTable = new ResolverRoutingTable();
+    }
+
+    /**
+     * Connect and initialize the routing table
+     */
+    public void init() {
+      try {
+        _manager.connect();
+        _manager.addExternalViewChangeListener(_routingTable);
+        _manager.addInstanceConfigChangeListener(_routingTable);
+
+        // Force an initial refresh
+        HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+        List<ExternalView> externalViews =
+            accessor.getChildValues(accessor.keyBuilder().externalViews());
+        NotificationContext context = new NotificationContext(_manager);
+        context.setType(NotificationContext.Type.INIT);
+        _routingTable.onExternalViewChange(externalViews, context);
+        List<InstanceConfig> instanceConfigs =
+            accessor.getChildValues(accessor.keyBuilder().instanceConfigs());
+        _routingTable.onInstanceConfigChange(instanceConfigs, context);
+      } catch (Exception e) {
+        LOG.error("Error setting up routing table", e);
+      }
+    }
+
+    /**
+     * Clean up the connection to the spectated cluster
+     */
+    public void shutdown() {
+      resetFuture();
+      expire();
+    }
+
+    /**
+     * Get the dynamically-updating routing table for this cluster
+     * @return ResolverRoutingTable, a RoutingTableProvider that can answer questions about its
+     *         contents
+     */
+    public ResolverRoutingTable getRoutingTable() {
+      renew();
+      return _routingTable;
+    }
+
+    public HelixManager getManager() {
+      return _manager;
+    }
+
+    private synchronized void renew() {
+      resetFuture();
+
+      // Schedule this connection to expire if not renewed quickly enough
+      _future = _executor.schedule(new Runnable() {
+        @Override
+        public void run() {
+          expire();
+        }
+      }, _leaseLengthMs, TimeUnit.MILLISECONDS);
+
+    }
+
+    private synchronized void resetFuture() {
+      if (_future != null && !_future.isDone()) {
+        _future.cancel(true);
+      }
+    }
+
+    private void expire() {
+      synchronized (_connections) {
+        _connections.remove(_cluster);
+        if (_manager != null && _manager.isConnected()) {
+          _manager.disconnect();
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/HelixAddress.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/HelixAddress.java b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixAddress.java
new file mode 100644
index 0000000..657e8bd
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixAddress.java
@@ -0,0 +1,70 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Objects;
+
+import java.net.InetSocketAddress;
+
+public class HelixAddress {
+
+  private final HelixMessageScope scope;
+  private final String instanceName;
+  private final InetSocketAddress socketAddress;
+
+  public HelixAddress(HelixMessageScope scope, String instanceName, InetSocketAddress socketAddress) {
+    this.scope = scope;
+    this.instanceName = instanceName;
+    this.socketAddress = socketAddress;
+  }
+
+  public HelixMessageScope getScope() {
+    return scope;
+  }
+
+  public String getInstanceName() {
+    return instanceName;
+  }
+
+  public InetSocketAddress getSocketAddress() {
+    return socketAddress;
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this).addValue(scope).addValue(instanceName)
+        .addValue(socketAddress).toString();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(scope, instanceName, socketAddress);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof HelixAddress)) {
+      return false;
+    }
+    HelixAddress a = (HelixAddress) o;
+    return a.getScope().equals(scope) && a.getInstanceName().equals(instanceName)
+        && a.getSocketAddress().equals(socketAddress);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/HelixMessageScope.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/HelixMessageScope.java b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixMessageScope.java
new file mode 100644
index 0000000..ba06556
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixMessageScope.java
@@ -0,0 +1,151 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Objects;
+
+/**
+ * A definition of the addressing scope of a message.
+ */
+public class HelixMessageScope {
+  private final String _cluster;
+  private final String _resource;
+  private final String _partition;
+  private final String _state;
+  private final String _srcInstance;
+
+  private HelixMessageScope(String cluster, String resource, String partition, String state,
+      String srcInstance) {
+    _cluster = cluster;
+    _resource = resource;
+    _partition = partition;
+    _state = state;
+    _srcInstance = srcInstance;
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this).addValue(_cluster).addValue(_resource).addValue(_partition)
+        .addValue(_state).toString();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(_cluster, _resource, _partition, _state);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other instanceof HelixMessageScope) {
+      HelixMessageScope that = (HelixMessageScope) other;
+      return Objects.equal(_cluster, that._cluster) && Objects.equal(_resource, that._resource)
+          && Objects.equal(_partition, that._partition) && Objects.equal(_state, that._state);
+    }
+    return false;
+  }
+
+  public String getCluster() {
+    return _cluster;
+  }
+
+  public String getResource() {
+    return _resource;
+  }
+
+  public String getPartition() {
+    return _partition;
+  }
+
+  public String getState() {
+    return _state;
+  }
+
+  public String getSourceInstance() {
+    return _srcInstance;
+  }
+
+  public boolean isValid() {
+    return _cluster != null && ((_partition != null && _resource != null) || _partition == null);
+  }
+
+  /**
+   * Creator for a HelixMessageScope
+   */
+  public static class Builder {
+    private String _cluster;
+    private String _resource;
+    private String _partition;
+    private String _state;
+    private String _sourceInstance;
+
+    /**
+     * Associate the scope with a cluster
+     * @param cluster the cluster to scope routing to
+     * @return Builder
+     */
+    public Builder cluster(String cluster) {
+      _cluster = cluster;
+      return this;
+    }
+
+    /**
+     * Associate the scope with a resource
+     * @param resource a resource served by the cluster
+     * @return Builder
+     */
+    public Builder resource(String resource) {
+      _resource = resource;
+      return this;
+    }
+
+    /**
+     * Associate the scope with a partition
+     * @param partition a specific partition of the scoped resource
+     * @return Builder
+     */
+    public Builder partition(String partition) {
+      _partition = partition;
+      return this;
+    }
+
+    /**
+     * Associate the scope with a state
+     * @param state a state that a resource in the cluster can be in
+     * @return Builder
+     */
+    public Builder state(String state) {
+      _state = state;
+      return this;
+    }
+
+    public Builder sourceInstance(String sourceInstance) {
+      _sourceInstance = sourceInstance;
+      return this;
+    }
+
+    /**
+     * Create the scope
+     * @return HelixMessageScope instance corresponding to the built scope
+     */
+    public HelixMessageScope build() {
+      return new HelixMessageScope(_cluster, _resource, _partition, _state, _sourceInstance);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java
new file mode 100644
index 0000000..b7f2f3e
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java
@@ -0,0 +1,47 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Set;
+
+/**
+ * An interface that resolves a message scope to a direct address.
+ */
+public interface HelixResolver {
+  /**
+   * Initialize a connection for scope resolution.
+   */
+  void connect();
+
+  /**
+   * Tear down any state and open connections to Helix clusters.
+   */
+  void disconnect();
+
+  /**
+   * Check the connection status
+   * @return true if connected, false otherwise
+   */
+  boolean isConnected();
+
+  Set<HelixAddress> getDestinations(HelixMessageScope scope);
+
+  HelixAddress getSource(HelixMessageScope scope);
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/ResolverRoutingTable.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/ResolverRoutingTable.java b/helix-ipc/src/main/java/org/apache/helix/resolver/ResolverRoutingTable.java
new file mode 100644
index 0000000..2bc5413
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/ResolverRoutingTable.java
@@ -0,0 +1,92 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.spectator.RoutingTableProvider;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * A routing table that can also return all resources, partitions, and states in the cluster
+ */
+public class ResolverRoutingTable extends RoutingTableProvider {
+  Map<String, Set<String>> _resourceMap;
+  Set<String> _stateSet;
+
+  /**
+   * Create the table.
+   */
+  public ResolverRoutingTable() {
+    super();
+    _resourceMap = Maps.newHashMap();
+    _stateSet = Sets.newHashSet();
+  }
+
+  /**
+   * Get all resources that are currently served in the cluster.
+   * @return set of resource names
+   */
+  public synchronized Set<String> getResources() {
+    return Sets.newHashSet(_resourceMap.keySet());
+  }
+
+  /**
+   * Get all partitions currently served for a resource.
+   * @param resource the resource for which to look up partitions
+   * @return set of partition names
+   */
+  public synchronized Set<String> getPartitions(String resource) {
+    if (_resourceMap.containsKey(resource)) {
+      return Sets.newHashSet(_resourceMap.get(resource));
+    } else {
+      return Collections.emptySet();
+    }
+  }
+
+  /**
+   * Get all states that partitions of all resources are currently in
+   * @return set of state names
+   */
+  public synchronized Set<String> getStates() {
+    return Sets.newHashSet(_stateSet);
+  }
+
+  @Override
+  public synchronized void onExternalViewChange(List<ExternalView> externalViewList,
+      NotificationContext changeContext) {
+    super.onExternalViewChange(externalViewList, changeContext);
+    _resourceMap.clear();
+    _stateSet.clear();
+    for (ExternalView externalView : externalViewList) {
+      _resourceMap.put(externalView.getResourceName(), externalView.getPartitionSet());
+      for (String partition : externalView.getPartitionSet()) {
+        _stateSet.addAll(externalView.getStateMap(partition).values());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/zk/ZKHelixResolver.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/zk/ZKHelixResolver.java b/helix-ipc/src/main/java/org/apache/helix/resolver/zk/ZKHelixResolver.java
new file mode 100644
index 0000000..48fa81a
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/zk/ZKHelixResolver.java
@@ -0,0 +1,45 @@
+package org.apache.helix.resolver.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.resolver.AbstractHelixResolver;
+
+/**
+ * A ZooKeeper-specific {@link org.apache.helix.resolver.HelixResolver}
+ */
+public class ZKHelixResolver extends AbstractHelixResolver {
+  private final String _zkAddress;
+
+  /**
+   * Create a ZK-based Helix resolver
+   * @param zkConnectString the connection string to the ZooKeeper ensemble
+   */
+  public ZKHelixResolver(String zkConnectString) {
+    _zkAddress = zkConnectString;
+  }
+
+  @Override
+  protected HelixManager createManager(String cluster) {
+    return HelixManagerFactory.getZKHelixManager(cluster, null, InstanceType.SPECTATOR, _zkAddress);
+  }
+}


Mime
View raw message