Return-Path:
X-Original-To: apmail-helix-commits-archive@minotaur.apache.org
Delivered-To: apmail-helix-commits-archive@minotaur.apache.org
Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
by minotaur.apache.org (Postfix) with SMTP id 0BBA2118CB
for ;
Thu, 28 Aug 2014 18:00:16 +0000 (UTC)
Received: (qmail 51743 invoked by uid 500); 28 Aug 2014 18:00:15 -0000
Delivered-To: apmail-helix-commits-archive@helix.apache.org
Received: (qmail 51668 invoked by uid 500); 28 Aug 2014 18:00:15 -0000
Mailing-List: contact commits-help@helix.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: dev@helix.apache.org
Delivered-To: mailing list commits@helix.apache.org
Received: (qmail 51595 invoked by uid 99); 28 Aug 2014 18:00:15 -0000
Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org)
(140.211.11.114)
by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Aug 2014 18:00:15 +0000
Received: by tyr.zones.apache.org (Postfix, from userid 65534)
id 80B51A04F67; Thu, 28 Aug 2014 18:00:15 +0000 (UTC)
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
From: kanak@apache.org
To: commits@helix.apache.org
Date: Thu, 28 Aug 2014 18:00:16 -0000
Message-Id: <4da9f5cffb7642baaee269968cd458bd@git.apache.org>
In-Reply-To:
References:
X-Mailer: ASF-Git Admin Mailer
Subject: [2/4] git commit: [HELIX-470] Netty-based IPC layer
[HELIX-470] Netty-based IPC layer
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f2475fa9
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f2475fa9
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f2475fa9
Branch: refs/heads/master
Commit: f2475fa9a6123052fea2588cdd4e439ddc7af020
Parents: bfe5d2d
Author: Greg Brandt
Authored: Tue Aug 26 13:14:36 2014 -0700
Committer: Greg Brandt
Committed: Tue Aug 26 13:14:36 2014 -0700
----------------------------------------------------------------------
.../helix/spectator/RoutingTableProvider.java | 24 +-
.../test/java/org/apache/helix/TestHelper.java | 10 +
helix-ipc/LICENSE | 273 +++++++++++
helix-ipc/NOTICE | 33 ++
helix-ipc/pom.xml | 141 ++++++
helix-ipc/src/assemble/assembly.xml | 60 +++
helix-ipc/src/main/config/log4j.properties | 31 ++
.../org/apache/helix/ipc/HelixIPCCallback.java | 32 ++
.../helix/ipc/HelixIPCMessageManager.java | 163 ++++++
.../org/apache/helix/ipc/HelixIPCService.java | 49 ++
.../helix/ipc/netty/NettyHelixIPCService.java | 490 +++++++++++++++++++
.../helix/resolver/AbstractHelixResolver.java | 295 +++++++++++
.../org/apache/helix/resolver/HelixAddress.java | 70 +++
.../helix/resolver/HelixMessageScope.java | 151 ++++++
.../apache/helix/resolver/HelixResolver.java | 47 ++
.../helix/resolver/ResolverRoutingTable.java | 92 ++++
.../helix/resolver/zk/ZKHelixResolver.java | 45 ++
.../helix/ipc/TestNettyHelixIPCService.java | 353 +++++++++++++
.../helix/ipc/benchmark/BenchmarkDriver.java | 221 +++++++++
.../helix/resolver/TestZKHelixResolver.java | 161 ++++++
helix-ipc/src/test/resources/build_benchmark.sh | 29 ++
helix-ipc/src/test/resources/run_benchmark.sh | 37 ++
pom.xml | 1 +
23 files changed, 2806 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 7799ca1..ccce64a 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -38,7 +38,8 @@ import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.log4j.Logger;
-public class RoutingTableProvider implements ExternalViewChangeListener, InstanceConfigChangeListener {
+public class RoutingTableProvider implements ExternalViewChangeListener,
+ InstanceConfigChangeListener {
private static final Logger logger = Logger.getLogger(RoutingTableProvider.class);
private final AtomicReference _routingTableRef;
@@ -91,6 +92,15 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
return instanceSet;
}
+ /**
+ * Get the configuration of an instance from its name
+ * @param instanceName the instance ID
+ * @return InstanceConfig if present, null otherwise
+ */
+ public InstanceConfig getInstanceConfig(String instanceName) {
+ return _routingTableRef.get().getConfig(instanceName);
+ }
+
@Override
public void onExternalViewChange(List externalViewList,
NotificationContext changeContext) {
@@ -124,12 +134,13 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
+ RoutingTable newRoutingTable = new RoutingTable();
List configList = accessor.getChildValues(keyBuilder.instanceConfigs());
Map instanceConfigMap = new HashMap();
for (InstanceConfig config : configList) {
instanceConfigMap.put(config.getId(), config);
+ newRoutingTable.addConfig(config);
}
- RoutingTable newRoutingTable = new RoutingTable();
if (externalViewList != null) {
for (ExternalView extView : externalViewList) {
String resourceName = extView.getId();
@@ -154,9 +165,11 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
class RoutingTable {
private final HashMap resourceInfoMap;
+ private final Map instanceConfigMap;
public RoutingTable() {
resourceInfoMap = new HashMap();
+ instanceConfigMap = new HashMap();
}
public void addEntry(String resourceName, String partitionName, String state,
@@ -169,10 +182,17 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
}
+ public void addConfig(InstanceConfig config) {
+ instanceConfigMap.put(config.getInstanceName(), config);
+ }
+
ResourceInfo get(String resourceName) {
return resourceInfoMap.get(resourceName);
}
+ InstanceConfig getConfig(String instanceName) {
+ return instanceConfigMap.get(instanceName);
+ }
}
class ResourceInfo {
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-core/src/test/java/org/apache/helix/TestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java
index 4a9139f..64796ba 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -20,7 +20,9 @@ package org.apache.helix;
*/
import java.io.File;
+import java.io.IOException;
import java.lang.reflect.Method;
+import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -769,4 +771,12 @@ public class TestHelper {
System.out.println(sb.toString());
}
+ public static int getRandomPort() throws IOException {
+ ServerSocket sock = new ServerSocket();
+ sock.bind(null);
+ int port = sock.getLocalPort();
+ sock.close();
+ return port;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/LICENSE
----------------------------------------------------------------------
diff --git a/helix-ipc/LICENSE b/helix-ipc/LICENSE
new file mode 100644
index 0000000..413913f
--- /dev/null
+++ b/helix-ipc/LICENSE
@@ -0,0 +1,273 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+
+
+For xstream:
+
+Copyright (c) 2003-2006, Joe Walnes
+Copyright (c) 2006-2009, 2011 XStream Committers
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+1. Redistributions of source code must retain the above copyright notice, this list of
+conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright notice, this list of
+conditions and the following disclaimer in the documentation and/or other materials provided
+with the distribution.
+
+3. Neither the name of XStream nor the names of its contributors may be used to endorse
+or promote products derived from this software without specific prior written
+permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
+EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
+TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
+WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+DAMAGE.
+
+for jline:
+
+Copyright (c) 2002-2006, Marc Prud'hommeaux
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or
+without modification, are permitted provided that the following
+conditions are met:
+
+Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with
+the distribution.
+
+Neither the name of JLine nor the names of its contributors
+may be used to endorse or promote products derived from this
+software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
+IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
+
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/NOTICE
----------------------------------------------------------------------
diff --git a/helix-ipc/NOTICE b/helix-ipc/NOTICE
new file mode 100644
index 0000000..1ee0d24
--- /dev/null
+++ b/helix-ipc/NOTICE
@@ -0,0 +1,33 @@
+Apache Helix
+Copyright 2014 The Apache Software Foundation
+
+
+I. Included Software
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+Licensed under the Apache License 2.0.
+
+This product includes software developed at
+Codehaus (http://www.codehaus.org/).
+Licensed under the BSD License.
+
+This product includes software developed at
+jline (http://jline.sourceforge.net/).
+Licensed under the BSD License.
+
+This product includes software developed at
+Google (http://www.google.com/).
+Licensed under the Apache License 2.0.
+
+This product includes software developed at
+snakeyaml (http://www.snakeyaml.org/).
+Licensed under the Apache License 2.0.
+
+This product includes software developed at
+zkclient (https://github.com/sgroschupf/zkclient).
+Licensed under the Apache License 2.0.
+
+II. License Summary
+- Apache License 2.0
+- BSD License
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/pom.xml
----------------------------------------------------------------------
diff --git a/helix-ipc/pom.xml b/helix-ipc/pom.xml
new file mode 100644
index 0000000..90ab11d
--- /dev/null
+++ b/helix-ipc/pom.xml
@@ -0,0 +1,141 @@
+
+
+
+
+ org.apache.helix
+ helix
+ 0.7.1-SNAPSHOT
+
+ 4.0.0
+
+ helix-ipc
+ bundle
+
+ Apache Helix :: IPC
+
+
+
+ javax.management*,
+ org.apache.commons.math*;version="[2.1,3)",
+ org.apache.log4j*;version="[1.2,2)",
+ org.restlet;version="[2.1.4,3]",
+ *
+
+
+ org.apache.helix.tools*
+
+ org.apache.helix*;version="${project.version};-noimport:=true
+
+
+
+
+ org.apache.helix
+ helix-core
+
+
+ io.netty
+ netty-all
+ 4.0.21.Final
+
+
+ com.codahale.metrics
+ metrics-core
+ 3.0.1
+
+
+ org.apache.helix
+ helix-core
+ test-jar
+ test
+
+
+ org.testng
+ testng
+ test
+
+
+ junit
+ junit
+
+
+
+
+
+
+
+ ${basedir}/src/main/resources
+ true
+
+
+ ${basedir}
+
+ DISCLAIMER
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+
+ org.codehaus.mojo
+ appassembler-maven-plugin
+
+
+
+ org.apache.helix.ipc.netty.NettyHelixIPCService
+ netty-helix-ipc-service
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+
+
+ jar-with-dependencies
+
+
+
+
+ package
+
+ single
+
+
+
+
+
+
+
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/assemble/assembly.xml
----------------------------------------------------------------------
diff --git a/helix-ipc/src/assemble/assembly.xml b/helix-ipc/src/assemble/assembly.xml
new file mode 100644
index 0000000..c2d08a1
--- /dev/null
+++ b/helix-ipc/src/assemble/assembly.xml
@@ -0,0 +1,60 @@
+
+
+
+ pkg
+
+ tar
+
+
+
+ ${project.build.directory}/${project.artifactId}-pkg/bin
+ bin
+ unix
+ 0755
+ 0755
+
+
+ ${project.build.directory}/${project.artifactId}-pkg/repo/
+ repo
+ 0755
+ 0755
+
+ **/*.xml
+
+
+
+ ${project.build.directory}/${project.artifactId}-pkg/conf
+ conf
+ unix
+ 0755
+ 0755
+
+
+ ${project.basedir}
+ /
+
+ LICENSE
+ NOTICE
+ DISCLAIMER
+
+ 0755
+
+
+
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/config/log4j.properties
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/config/log4j.properties b/helix-ipc/src/main/config/log4j.properties
new file mode 100644
index 0000000..4b3dc31
--- /dev/null
+++ b/helix-ipc/src/main/config/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=ERROR,A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.logger.org.I0Itec=ERROR
+log4j.logger.org.apache=ERROR
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCCallback.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCCallback.java b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCCallback.java
new file mode 100644
index 0000000..9ea23ca
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCCallback.java
@@ -0,0 +1,32 @@
+package org.apache.helix.ipc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.buffer.ByteBuf;
+import org.apache.helix.resolver.HelixMessageScope;
+
+import java.util.UUID;
+
+/**
+ * Callback registered per message type to handle messages.
+ */
+public interface HelixIPCCallback {
+ void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message);
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCMessageManager.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCMessageManager.java b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCMessageManager.java
new file mode 100644
index 0000000..89b10eb
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCMessageManager.java
@@ -0,0 +1,163 @@
+package org.apache.helix.ipc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.buffer.ByteBuf;
+import org.apache.helix.resolver.HelixAddress;
+import org.apache.helix.resolver.HelixMessageScope;
+import org.apache.log4j.Logger;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A wrapper around a base IPC service that manages message retries / timeouts.
+ *
+ * This class manages retries and timeouts, and can be used in the same way as a
+ * {@link org.apache.helix.ipc.HelixIPCService}.
+ *
+ *
+ * A message will be sent until the max number of retries has been reached (i.e. timed out), or it
+ * is acknowledged by the recipient. If the max number of retries is -1, it will be retried forever.
+ *
+ *
+ * A callback should be registered for every acknowledgement message type associated with any
+ * original message type sent by this class.
+ *
+ *
+ * For example, consider we have the two message types defined: DATA_REQ = 1, DATA_ACK = 2. One
+ * would do the following:
+ *
+ *
+ * messageManager.registerCallback(DATA_ACK, new HelixIPCCallback() {
+ * public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+ * // Process ACK
+ * }
+ *
+ * public void onError(HelixMessageScope scope, UUID messageId, Throwable cause) {
+ * // Message error or timeout
+ * }
+ * });
+ *
+ * messageManager.send(destinations, DATA_REQ, messageId, data);
+ *
+ *
+ *
+ *
+ * In send, we note messageId, and retry until we get a DATA_ACK for the same messageId. The
+ * callback registered with the message manager will only be called once, even if the message is
+ * acknowledged several times.
+ *
+ */
+public class HelixIPCMessageManager implements HelixIPCService {
+
+ private static final Logger LOG = Logger.getLogger(HelixIPCMessageManager.class);
+
+ private final ScheduledExecutorService scheduler;
+ private final HelixIPCService baseIpcService;
+ private final long messageTimeoutMillis;
+ private final int maxNumRetries;
+ private final ConcurrentMap pendingMessages;
+ private final ConcurrentMap retriesLeft;
+ private final ConcurrentMap messageBuffers;
+ private final ConcurrentMap callbacks;
+
+ public HelixIPCMessageManager(ScheduledExecutorService scheduler, HelixIPCService baseIpcService,
+ long messageTimeoutMillis, int maxNumRetries) {
+ this.scheduler = scheduler;
+ this.baseIpcService = baseIpcService;
+ this.maxNumRetries = maxNumRetries;
+ this.messageTimeoutMillis = messageTimeoutMillis;
+ this.pendingMessages = new ConcurrentHashMap();
+ this.retriesLeft = new ConcurrentHashMap();
+ this.messageBuffers = new ConcurrentHashMap();
+ this.callbacks = new ConcurrentHashMap();
+ }
+
+ @Override
+ public void start() throws Exception {
+ baseIpcService.start();
+ }
+
+ @Override
+ public void shutdown() throws Exception {
+ baseIpcService.shutdown();
+ }
+
+ @Override
+ public void send(final HelixAddress destination, final int messageType, final UUID messageId,
+ final ByteBuf message) {
+ // State
+ pendingMessages.put(messageId, true);
+ retriesLeft.putIfAbsent(messageId, new AtomicInteger(maxNumRetries));
+ messageBuffers.put(messageId, message);
+
+ // Will free it when we've finally received response
+ message.retain();
+
+ // Send initial message
+ baseIpcService.send(destination, messageType, messageId, message);
+
+ // Retries
+ scheduler.schedule(new Runnable() {
+ @Override
+ public void run() {
+ Boolean isPending = pendingMessages.get(messageId);
+ AtomicInteger numLeft = retriesLeft.get(messageId);
+ if (numLeft != null && isPending != null && isPending) {
+ if (numLeft.decrementAndGet() > 0 || maxNumRetries == -1) {
+ // n.b. will schedule another retry
+ send(destination, messageType, messageId, message);
+ } else {
+ LOG.warn("Message " + messageId + " timed out after " + maxNumRetries + " retries");
+ }
+ }
+ }
+ }, messageTimeoutMillis, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void registerCallback(final int messageType, final HelixIPCCallback callback) {
+
+ // This callback will first check if the message is pending, then delegate to the provided
+ // callback if it has not yet done so.
+ HelixIPCCallback wrappedCallback = new HelixIPCCallback() {
+ @Override
+ public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+ if (pendingMessages.replace(messageId, true, false)) {
+ pendingMessages.remove(messageId);
+ ByteBuf originalMessage = messageBuffers.remove(messageId);
+ if (originalMessage != null) {
+ originalMessage.release();
+ }
+ retriesLeft.remove(messageId);
+ callback.onMessage(scope, messageId, message);
+ }
+ }
+ };
+
+ callbacks.put(messageType, wrappedCallback);
+ baseIpcService.registerCallback(messageType, wrappedCallback);
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCService.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCService.java b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCService.java
new file mode 100644
index 0000000..6158514
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCService.java
@@ -0,0 +1,49 @@
+package org.apache.helix.ipc;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.buffer.ByteBuf;
+import org.apache.helix.resolver.HelixAddress;
+
+import java.util.UUID;
+
+/**
+ * Allows message passing among instances in Helix clusters.
+ *
+ * Messages are sent asynchronously using {@link #send}, and handled by callbacks registered via
+ * {@link #registerCallback}
+ *
+ */
+public interface HelixIPCService {
+
+ static final String IPC_PORT = "IPC_PORT";
+
+ /** Starts service (must call before {@link #send}) */
+ void start() throws Exception;
+
+ /** Shuts down service and releases any resources */
+ void shutdown() throws Exception;
+
+ /** Sends a message to one or more instances that map to a cluster scope. */
+ void send(HelixAddress destination, int messageType, UUID messageId, ByteBuf message);
+
+ /** Registers a callback for a given message type */
+ void registerCallback(int messageType, HelixIPCCallback callback);
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java
new file mode 100644
index 0000000..00d6157
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java
@@ -0,0 +1,490 @@
+package org.apache.helix.ipc.netty;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.helix.ipc.HelixIPCCallback;
+import org.apache.helix.ipc.HelixIPCService;
+import org.apache.helix.resolver.HelixAddress;
+import org.apache.helix.resolver.HelixMessageScope;
+import org.apache.log4j.Logger;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * Provides partition/state-level messaging among nodes in a Helix cluster.
+ *
+ * The message format is (where len == 4B, and contains the length of the next field)
+ *
+ *
+ * +----------------------+
+ * | totalLength (4B) |
+ * +----------------------+
+ * | version (4B) |
+ * +----------------------+
+ * | messageType (4B) |
+ * +----------------------+
+ * | messageId (16B) |
+ * +----------------------+
+ * | len | cluster |
+ * +----------------------+
+ * | len | resource |
+ * +----------------------+
+ * | len | partition |
+ * +----------------------+
+ * | len | state |
+ * +----------------------+
+ * | len | srcInstance |
+ * +----------------------+
+ * | len | dstInstance |
+ * +----------------------+
+ * | len | message |
+ * +----------------------+
+ *
+ *
+ *
+ */
+public class NettyHelixIPCService implements HelixIPCService {
+
+ private static final Logger LOG = Logger.getLogger(NettyHelixIPCService.class);
+ private static final int MESSAGE_VERSION = 1;
+
+ // Parameters for length header field of message (tells decoder to interpret but preserve length
+ // field in message)
+ private static final int MAX_FRAME_LENGTH = 1024 * 1024;
+ private static final int LENGTH_FIELD_OFFSET = 0;
+ private static final int LENGTH_FIELD_LENGTH = 4;
+ private static final int LENGTH_ADJUSTMENT = -4;
+ private static final int INITIAL_BYTES_TO_STRIP = 0;
+ private static final int NUM_LENGTH_FIELDS = 7;
+
+ private final Config config;
+ private final AtomicBoolean isShutdown;
+ private final Map> channelMap;
+ private final ConcurrentMap channelOpenTimes;
+ private final MetricRegistry metricRegistry;
+ private final ConcurrentMap callbacks;
+
+ private EventLoopGroup eventLoopGroup;
+ private Bootstrap clientBootstrap;
+ private Meter statTxMsg;
+ private Meter statRxMsg;
+ private Meter statTxBytes;
+ private Meter statRxBytes;
+ private Counter statChannelOpen;
+ private Counter statError;
+ private JmxReporter jmxReporter;
+
+ public NettyHelixIPCService(Config config) {
+ super();
+ this.config = config;
+ this.isShutdown = new AtomicBoolean(true);
+ this.channelMap = new HashMap>();
+ this.channelOpenTimes = new ConcurrentHashMap();
+ this.metricRegistry = new MetricRegistry();
+ this.callbacks = new ConcurrentHashMap();
+ }
+
+ /**
+ * Starts message handling server, creates client bootstrap, and bootstraps partition routing
+ * table.
+ */
+ public void start() throws Exception {
+ if (isShutdown.getAndSet(false)) {
+ eventLoopGroup = new NioEventLoopGroup();
+
+ statTxMsg = metricRegistry.meter(MetricRegistry.name(NettyHelixIPCService.class, "txMsg"));
+ statRxMsg = metricRegistry.meter(MetricRegistry.name(NettyHelixIPCService.class, "rxMsg"));
+ statTxBytes =
+ metricRegistry.meter(MetricRegistry.name(NettyHelixIPCService.class, "txBytes"));
+ statRxBytes =
+ metricRegistry.meter(MetricRegistry.name(NettyHelixIPCService.class, "rxBytes"));
+ statChannelOpen =
+ metricRegistry.counter(MetricRegistry.name(NettyHelixIPCService.class, "channelOpen"));
+ statError = metricRegistry.counter(MetricRegistry.name(NettyHelixIPCService.class, "error"));
+
+ // Report metrics via JMX
+ jmxReporter = JmxReporter.forRegistry(metricRegistry).build();
+ jmxReporter.start();
+
+ new ServerBootstrap().group(eventLoopGroup).channel(NioServerSocketChannel.class)
+ .option(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childHandler(new ChannelInitializer() {
+ @Override
+ protected void initChannel(SocketChannel socketChannel) throws Exception {
+ socketChannel.pipeline().addLast(
+ new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET,
+ LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP));
+ socketChannel.pipeline().addLast(new HelixIPCCallbackHandler());
+ }
+ }).bind(new InetSocketAddress(config.getPort()));
+
+ clientBootstrap =
+ new Bootstrap().group(eventLoopGroup).channel(NioSocketChannel.class)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .handler(new ChannelInitializer() {
+ @Override
+ protected void initChannel(SocketChannel socketChannel) throws Exception {
+ socketChannel.pipeline().addLast(
+ new LengthFieldPrepender(LENGTH_FIELD_LENGTH, true));
+ }
+ });
+ }
+ }
+
+ /**
+ * Shuts down event loops for message handling server and message passing client.
+ */
+ public void shutdown() throws Exception {
+ if (!isShutdown.getAndSet(true)) {
+ jmxReporter.stop();
+ eventLoopGroup.shutdownGracefully();
+ }
+ }
+
+ /**
+ * Sends a message to all partitions with a given state in the cluster.
+ */
+ @Override
+ public void send(HelixAddress destination, int messageType, UUID messageId, ByteBuf message) {
+ // Send message
+ try {
+ // Get list of channels
+ List channels = channelMap.get(destination.getSocketAddress());
+ if (channels == null) {
+ synchronized (channelMap) {
+ channels = channelMap.get(destination.getSocketAddress());
+ if (channels == null) {
+ channels = new ArrayList(config.getNumConnections());
+ for (int i = 0; i < config.getNumConnections(); i++) {
+ channels.add(null);
+ }
+ channelMap.put(destination.getSocketAddress(), channels);
+ }
+ }
+ }
+
+ // Pick the channel for this scope
+ int idx = (Integer.MAX_VALUE & destination.getScope().hashCode()) % channels.size();
+ Channel channel = channels.get(idx);
+ if (channel == null || !channel.isOpen() || isExpired(channel)) {
+ synchronized (channelMap) {
+ channel = channels.get(idx);
+ if (channel == null || !channel.isOpen() || isExpired(channel)) {
+ if (channel != null && channel.isOpen()) {
+ channel.close();
+ }
+ channel = clientBootstrap.connect(destination.getSocketAddress()).sync().channel();
+ channels.set(idx, channel);
+ statChannelOpen.inc();
+ channelOpenTimes.put(channel, System.currentTimeMillis());
+ }
+ }
+ }
+
+ // Compute total length
+ int headerLength =
+ NUM_LENGTH_FIELDS
+ * (Integer.SIZE / 8)
+ + (Integer.SIZE / 8)
+ * 2 // version, type
+ + (Long.SIZE / 8)
+ * 2 // 128 bit UUID
+ + getLength(destination.getScope().getCluster())
+ + getLength(destination.getScope().getResource())
+ + getLength(destination.getScope().getPartition())
+ + getLength(destination.getScope().getState()) + getLength(config.getInstanceName())
+ + getLength(destination.getInstanceName());
+ int messageLength = message == null ? 0 : message.readableBytes();
+
+ // Build message header
+ ByteBuf headerBuf = channel.alloc().buffer(headerLength);
+ headerBuf.writeInt(MESSAGE_VERSION).writeInt(messageType)
+ .writeLong(messageId.getMostSignificantBits())
+ .writeLong(messageId.getLeastSignificantBits());
+ writeStringWithLength(headerBuf, destination.getScope().getCluster());
+ writeStringWithLength(headerBuf, destination.getScope().getResource());
+ writeStringWithLength(headerBuf, destination.getScope().getPartition());
+ writeStringWithLength(headerBuf, destination.getScope().getState());
+ writeStringWithLength(headerBuf, config.getInstanceName());
+ writeStringWithLength(headerBuf, destination.getInstanceName());
+
+ // Compose message header and payload
+ headerBuf.writeInt(messageLength);
+ CompositeByteBuf fullByteBuf = channel.alloc().compositeBuffer(2);
+ fullByteBuf.addComponent(headerBuf);
+ fullByteBuf.writerIndex(headerBuf.readableBytes());
+ if (message != null) {
+ fullByteBuf.addComponent(message);
+ fullByteBuf.writerIndex(fullByteBuf.writerIndex() + message.readableBytes());
+ }
+
+ // Send
+ statTxMsg.mark();
+ statTxBytes.mark(fullByteBuf.readableBytes());
+ channel.writeAndFlush(fullByteBuf);
+ } catch (Exception e) {
+ statError.inc();
+ throw new IllegalStateException("Could not send message to " + destination, e);
+ }
+ }
+
+ private boolean isExpired(Channel channel) {
+ Long channelOpenTime = channelOpenTimes.get(channel);
+ return channelOpenTime != null
+ && System.currentTimeMillis() - channelOpenTime >= config.getMaxChannelLifeMillis();
+ }
+
+ @Override
+ public void registerCallback(int messageType, HelixIPCCallback callback) {
+ callbacks.put(messageType, callback);
+ }
+
+ @ChannelHandler.Sharable
+ private class HelixIPCCallbackHandler extends SimpleChannelInboundHandler {
+
+ HelixIPCCallbackHandler() {
+ super(false); // we will manage reference
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
+ try {
+ int idx = 0;
+
+ // Message length
+ int messageLength = byteBuf.readInt();
+ idx += 4;
+
+ // Message version
+ @SuppressWarnings("unused")
+ int messageVersion = byteBuf.readInt();
+ idx += 4;
+
+ // Message type
+ int messageType = byteBuf.readInt();
+ idx += 4;
+
+ // Message ID
+ UUID messageId = new UUID(byteBuf.readLong(), byteBuf.readLong());
+ idx += 16;
+
+ // Cluster
+ byteBuf.readerIndex(idx);
+ int clusterSize = byteBuf.readInt();
+ idx += 4;
+ checkLength("clusterSize", clusterSize, messageLength);
+ String clusterName = toNonEmptyString(clusterSize, byteBuf);
+ idx += clusterSize;
+
+ // Resource
+ byteBuf.readerIndex(idx);
+ int resourceSize = byteBuf.readInt();
+ idx += 4;
+ checkLength("resourceSize", resourceSize, messageLength);
+ String resourceName = toNonEmptyString(resourceSize, byteBuf);
+ idx += resourceSize;
+
+ // Partition
+ byteBuf.readerIndex(idx);
+ int partitionSize = byteBuf.readInt();
+ idx += 4;
+ checkLength("partitionSize", partitionSize, messageLength);
+ String partitionName = toNonEmptyString(partitionSize, byteBuf);
+ idx += partitionSize;
+
+ // State
+ byteBuf.readerIndex(idx);
+ int stateSize = byteBuf.readInt();
+ idx += 4;
+ checkLength("stateSize", stateSize, messageLength);
+ String state = toNonEmptyString(stateSize, byteBuf);
+ idx += stateSize;
+
+ // Source instance
+ byteBuf.readerIndex(idx);
+ int srcInstanceSize = byteBuf.readInt();
+ idx += 4;
+ checkLength("srcInstanceSize", srcInstanceSize, messageLength);
+ String srcInstance = toNonEmptyString(srcInstanceSize, byteBuf);
+ idx += srcInstanceSize;
+
+ // Destination instance
+ byteBuf.readerIndex(idx);
+ int dstInstanceSize = byteBuf.readInt();
+ idx += 4;
+ checkLength("dstInstanceSize", dstInstanceSize, messageLength);
+ String dstInstance = toNonEmptyString(dstInstanceSize, byteBuf);
+ idx += dstInstanceSize;
+
+ // Position at message
+ byteBuf.readerIndex(idx + 4);
+
+ // Error check
+ if (dstInstance == null) {
+ throw new IllegalStateException("Received message addressed to null destination from "
+ + srcInstance);
+ } else if (!dstInstance.equals(config.getInstanceName())) {
+ throw new IllegalStateException(config.getInstanceName()
+ + " received message addressed to " + dstInstance + " from " + srcInstance);
+ } else if (callbacks.get(messageType) == null) {
+ throw new IllegalStateException("No callback registered for message type " + messageType);
+ }
+
+ // Build scope
+ HelixMessageScope scope =
+ new HelixMessageScope.Builder().cluster(clusterName).resource(resourceName)
+ .partition(partitionName).state(state).sourceInstance(srcInstance).build();
+
+ // Get callback
+ HelixIPCCallback callback = callbacks.get(messageType);
+ if (callback == null) {
+ throw new IllegalStateException("No callback registered for message type " + messageType);
+ }
+
+ // Handle callback
+ callback.onMessage(scope, messageId, byteBuf);
+
+ // Stats
+ statRxMsg.mark();
+ statRxBytes.mark(messageLength);
+ } finally {
+ byteBuf.release();
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable cause) {
+ LOG.error(cause);
+ }
+ }
+
+ /** Given a byte buf w/ a certain reader index, encodes the next length bytes as a String */
+ private static String toNonEmptyString(int length, ByteBuf byteBuf) {
+ if (byteBuf.readableBytes() >= length) {
+ return byteBuf.toString(byteBuf.readerIndex(), length, Charset.defaultCharset());
+ }
+ return null;
+ }
+
+ /** Writes [s.length(), s] to buf, or [0] if s is null */
+ private static void writeStringWithLength(ByteBuf buf, String s) {
+ if (s == null) {
+ buf.writeInt(0);
+ return;
+ }
+
+ buf.writeInt(s.length());
+ for (int i = 0; i < s.length(); i++) {
+ buf.writeByte(s.charAt(i));
+ }
+ }
+
+ /** Returns the length of a string, or 0 if s is null */
+ private static int getLength(String s) {
+ return s == null ? 0 : s.length();
+ }
+
+ /**
+ * @throws java.lang.IllegalArgumentException if length > messageLength (attempt to prevent OOM
+ * exceptions)
+ */
+ private static void checkLength(String fieldName, int length, int messageLength)
+ throws IllegalArgumentException {
+ if (length > messageLength) {
+ throw new IllegalArgumentException(fieldName + "=" + length
+ + " is greater than messageLength=" + messageLength);
+ }
+ }
+
+ public static class Config {
+ private String instanceName;
+ private int port;
+ private int numConnections = 1;
+ private long maxChannelLifeMillis = 5000;
+
+ public Config setInstanceName(String instanceName) {
+ this.instanceName = instanceName;
+ return this;
+ }
+
+ public Config setPort(int port) {
+ this.port = port;
+ return this;
+ }
+
+ public Config setNumConnections(int numConnections) {
+ this.numConnections = numConnections;
+ return this;
+ }
+
+ public Config setMaxChannelLifeMillis(long maxChannelLifeMillis) {
+ this.maxChannelLifeMillis = maxChannelLifeMillis;
+ return this;
+ }
+
+ public String getInstanceName() {
+ return instanceName;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public int getNumConnections() {
+ return numConnections;
+ }
+
+ public long getMaxChannelLifeMillis() {
+ return maxChannelLifeMillis;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/AbstractHelixResolver.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/AbstractHelixResolver.java b/helix-ipc/src/main/java/org/apache/helix/resolver/AbstractHelixResolver.java
new file mode 100644
index 0000000..c0fd2bb
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/AbstractHelixResolver.java
@@ -0,0 +1,295 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * A basic implementation of a resolver in terms of expiring routing tables
+ */
+public abstract class AbstractHelixResolver implements HelixResolver {
+ private static final Logger LOG = Logger.getLogger(AbstractHelixResolver.class);
+ private static final int DEFAULT_THREAD_POOL_SIZE = 10;
+ private static final long DEFAULT_LEASE_LENGTH_MS = 60 * 60 * 1000; // TODO: are these good
+ // values?
+ private static final String IPC_PORT = "IPC_PORT";
+ private final Map _connections;
+ private boolean _isConnected;
+ private ScheduledExecutorService _executor;
+
+ protected AbstractHelixResolver() {
+ _connections = Maps.newHashMap();
+ _isConnected = false;
+ }
+
+ @Override
+ public void connect() {
+ _executor = Executors.newScheduledThreadPool(DEFAULT_THREAD_POOL_SIZE);
+ _isConnected = true;
+ }
+
+ @Override
+ public void disconnect() {
+ synchronized (_connections) {
+ for (Spectator connection : _connections.values()) {
+ connection.shutdown();
+ }
+ _connections.clear();
+ }
+ _executor.shutdown();
+ _isConnected = false;
+ }
+
+ @Override
+ public Set getDestinations(HelixMessageScope scope) {
+ if (!scope.isValid()) {
+ LOG.error("Scope " + scope + " is not valid!");
+ return new HashSet();
+ } else if (!_isConnected) {
+ LOG.error("Cannot resolve " + scope + " without first connecting!");
+ return new HashSet();
+ }
+
+ // Connect or refresh connection
+ String cluster = scope.getCluster();
+ ResolverRoutingTable routingTable;
+ Spectator connection = _connections.get(cluster);
+ if (connection == null || !connection.getManager().isConnected()) {
+ synchronized (_connections) {
+ connection = _connections.get(cluster);
+ if (connection == null || !connection.getManager().isConnected()) {
+ connection = new Spectator(cluster, DEFAULT_LEASE_LENGTH_MS);
+ connection.init();
+ _connections.put(cluster, connection);
+ }
+ }
+ }
+ routingTable = connection.getRoutingTable();
+
+ // Resolve all resources, either explicitly or match all
+ Set resources;
+ if (scope.getResource() != null) {
+ resources = Sets.newHashSet(scope.getResource());
+ } else {
+ resources = routingTable.getResources();
+ }
+
+ // Resolve all partitions
+ Map> partitionMap = Maps.newHashMap();
+ if (scope.getPartition() != null) {
+ for (String resource : resources) {
+ partitionMap.put(resource, Sets.newHashSet(scope.getPartition()));
+ }
+ } else {
+ for (String resource : resources) {
+ partitionMap.put(resource, routingTable.getPartitions(resource));
+ }
+ }
+
+ // Resolve all states
+ Set states;
+ if (scope.getState() != null) {
+ states = Sets.newHashSet(scope.getState());
+ } else {
+ states = routingTable.getStates();
+ }
+
+ // Get all the participants that match
+ Set participants = Sets.newHashSet();
+ for (String resource : resources) {
+ for (String partition : partitionMap.get(resource)) {
+ for (String state : states) {
+ participants.addAll(routingTable.getInstances(resource, partition, state));
+ }
+ }
+ }
+
+ // Resolve those participants
+ Set result = new HashSet();
+ for (InstanceConfig participant : participants) {
+ String ipcPort = participant.getRecord().getSimpleField(IPC_PORT);
+ if (ipcPort == null) {
+ LOG.error("No ipc address registered for target instance " + participant.getInstanceName()
+ + ", skipping");
+ } else {
+ result.add(new HelixAddress(scope, participant.getInstanceName(), new InetSocketAddress(
+ participant.getHostName(), Integer.valueOf(ipcPort))));
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public HelixAddress getSource(HelixMessageScope scope) {
+ // Connect or refresh connection
+ String cluster = scope.getCluster();
+ ResolverRoutingTable routingTable;
+ Spectator connection = _connections.get(cluster);
+ if (connection == null || !connection.getManager().isConnected()) {
+ synchronized (_connections) {
+ connection = _connections.get(cluster);
+ if (connection == null || !connection.getManager().isConnected()) {
+ connection = new Spectator(cluster, DEFAULT_LEASE_LENGTH_MS);
+ connection.init();
+ _connections.put(cluster, connection);
+ }
+ }
+ }
+ routingTable = connection.getRoutingTable();
+
+ if (scope.getSourceInstance() != null) {
+ InstanceConfig config = routingTable.getInstanceConfig(scope.getSourceInstance());
+ String ipcPort = config.getRecord().getSimpleField(IPC_PORT);
+ if (ipcPort == null) {
+ throw new IllegalStateException("No IPC address registered for source instance "
+ + scope.getSourceInstance());
+ }
+ return new HelixAddress(scope, scope.getSourceInstance(), new InetSocketAddress(
+ config.getHostName(), Integer.valueOf(ipcPort)));
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean isConnected() {
+ return _isConnected;
+ }
+
+ /**
+ * Create a Helix manager connection based on the appropriate backing store
+ * @param cluster the name of the cluster to connect to
+ * @return HelixManager instance
+ */
+ protected abstract HelixManager createManager(String cluster);
+
+ private class Spectator {
+ private final String _cluster;
+ private final HelixManager _manager;
+ private final ResolverRoutingTable _routingTable;
+ private final long _leaseLengthMs;
+ private ScheduledFuture> _future;
+
+ /**
+ * Initialize a spectator. This does not automatically connect.
+ * @param cluster the cluster to spectate
+ * @param leaseLengthMs the expiry of this spectator after the last request
+ */
+ public Spectator(String cluster, long leaseLengthMs) {
+ _cluster = cluster;
+ _manager = createManager(cluster);
+ _leaseLengthMs = leaseLengthMs;
+ _routingTable = new ResolverRoutingTable();
+ }
+
+ /**
+ * Connect and initialize the routing table
+ */
+ public void init() {
+ try {
+ _manager.connect();
+ _manager.addExternalViewChangeListener(_routingTable);
+ _manager.addInstanceConfigChangeListener(_routingTable);
+
+ // Force an initial refresh
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ List externalViews =
+ accessor.getChildValues(accessor.keyBuilder().externalViews());
+ NotificationContext context = new NotificationContext(_manager);
+ context.setType(NotificationContext.Type.INIT);
+ _routingTable.onExternalViewChange(externalViews, context);
+ List instanceConfigs =
+ accessor.getChildValues(accessor.keyBuilder().instanceConfigs());
+ _routingTable.onInstanceConfigChange(instanceConfigs, context);
+ } catch (Exception e) {
+ LOG.error("Error setting up routing table", e);
+ }
+ }
+
+ /**
+ * Clean up the connection to the spectated cluster
+ */
+ public void shutdown() {
+ resetFuture();
+ expire();
+ }
+
+ /**
+ * Get the dynamically-updating routing table for this cluster
+ * @return ResolverRoutingTable, a RoutingTableProvider that can answer questions about its
+ * contents
+ */
+ public ResolverRoutingTable getRoutingTable() {
+ renew();
+ return _routingTable;
+ }
+
+ public HelixManager getManager() {
+ return _manager;
+ }
+
+ private synchronized void renew() {
+ resetFuture();
+
+ // Schedule this connection to expire if not renewed quickly enough
+ _future = _executor.schedule(new Runnable() {
+ @Override
+ public void run() {
+ expire();
+ }
+ }, _leaseLengthMs, TimeUnit.MILLISECONDS);
+
+ }
+
+ private synchronized void resetFuture() {
+ if (_future != null && !_future.isDone()) {
+ _future.cancel(true);
+ }
+ }
+
+ private void expire() {
+ synchronized (_connections) {
+ _connections.remove(_cluster);
+ if (_manager != null && _manager.isConnected()) {
+ _manager.disconnect();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/HelixAddress.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/HelixAddress.java b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixAddress.java
new file mode 100644
index 0000000..657e8bd
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixAddress.java
@@ -0,0 +1,70 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Objects;
+
+import java.net.InetSocketAddress;
+
+public class HelixAddress {
+
+ private final HelixMessageScope scope;
+ private final String instanceName;
+ private final InetSocketAddress socketAddress;
+
+ public HelixAddress(HelixMessageScope scope, String instanceName, InetSocketAddress socketAddress) {
+ this.scope = scope;
+ this.instanceName = instanceName;
+ this.socketAddress = socketAddress;
+ }
+
+ public HelixMessageScope getScope() {
+ return scope;
+ }
+
+ public String getInstanceName() {
+ return instanceName;
+ }
+
+ public InetSocketAddress getSocketAddress() {
+ return socketAddress;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this).addValue(scope).addValue(instanceName)
+ .addValue(socketAddress).toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(scope, instanceName, socketAddress);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof HelixAddress)) {
+ return false;
+ }
+ HelixAddress a = (HelixAddress) o;
+ return a.getScope().equals(scope) && a.getInstanceName().equals(instanceName)
+ && a.getSocketAddress().equals(socketAddress);
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/HelixMessageScope.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/HelixMessageScope.java b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixMessageScope.java
new file mode 100644
index 0000000..ba06556
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixMessageScope.java
@@ -0,0 +1,151 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Objects;
+
+/**
+ * A definition of the addressing scope of a message.
+ */
+public class HelixMessageScope {
+ private final String _cluster;
+ private final String _resource;
+ private final String _partition;
+ private final String _state;
+ private final String _srcInstance;
+
+ private HelixMessageScope(String cluster, String resource, String partition, String state,
+ String srcInstance) {
+ _cluster = cluster;
+ _resource = resource;
+ _partition = partition;
+ _state = state;
+ _srcInstance = srcInstance;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this).addValue(_cluster).addValue(_resource).addValue(_partition)
+ .addValue(_state).toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(_cluster, _resource, _partition, _state);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof HelixMessageScope) {
+ HelixMessageScope that = (HelixMessageScope) other;
+ return Objects.equal(_cluster, that._cluster) && Objects.equal(_resource, that._resource)
+ && Objects.equal(_partition, that._partition) && Objects.equal(_state, that._state);
+ }
+ return false;
+ }
+
+ public String getCluster() {
+ return _cluster;
+ }
+
+ public String getResource() {
+ return _resource;
+ }
+
+ public String getPartition() {
+ return _partition;
+ }
+
+ public String getState() {
+ return _state;
+ }
+
+ public String getSourceInstance() {
+ return _srcInstance;
+ }
+
+ public boolean isValid() {
+ return _cluster != null && ((_partition != null && _resource != null) || _partition == null);
+ }
+
+ /**
+ * Creator for a HelixMessageScope
+ */
+ public static class Builder {
+ private String _cluster;
+ private String _resource;
+ private String _partition;
+ private String _state;
+ private String _sourceInstance;
+
+ /**
+ * Associate the scope with a cluster
+ * @param cluster the cluster to scope routing to
+ * @return Builder
+ */
+ public Builder cluster(String cluster) {
+ _cluster = cluster;
+ return this;
+ }
+
+ /**
+ * Associate the scope with a resource
+ * @param resource a resource served by the cluster
+ * @return Builder
+ */
+ public Builder resource(String resource) {
+ _resource = resource;
+ return this;
+ }
+
+ /**
+ * Associate the scope with a partition
+ * @param partition a specific partition of the scoped resource
+ * @return Builder
+ */
+ public Builder partition(String partition) {
+ _partition = partition;
+ return this;
+ }
+
+ /**
+ * Associate the scope with a state
+ * @param state a state that a resource in the cluster can be in
+ * @return Builder
+ */
+ public Builder state(String state) {
+ _state = state;
+ return this;
+ }
+
+ public Builder sourceInstance(String sourceInstance) {
+ _sourceInstance = sourceInstance;
+ return this;
+ }
+
+ /**
+ * Create the scope
+ * @return HelixMessageScope instance corresponding to the built scope
+ */
+ public HelixMessageScope build() {
+ return new HelixMessageScope(_cluster, _resource, _partition, _state, _sourceInstance);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java
new file mode 100644
index 0000000..b7f2f3e
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java
@@ -0,0 +1,47 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Set;
+
+/**
+ * An interface that resolves a message scope to a direct address.
+ */
+public interface HelixResolver {
+ /**
+ * Initialize a connection for scope resolution.
+ */
+ void connect();
+
+ /**
+ * Tear down any state and open connections to Helix clusters.
+ */
+ void disconnect();
+
+ /**
+ * Check the connection status
+ * @return true if connected, false otherwise
+ */
+ boolean isConnected();
+
+ Set getDestinations(HelixMessageScope scope);
+
+ HelixAddress getSource(HelixMessageScope scope);
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/ResolverRoutingTable.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/ResolverRoutingTable.java b/helix-ipc/src/main/java/org/apache/helix/resolver/ResolverRoutingTable.java
new file mode 100644
index 0000000..2bc5413
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/ResolverRoutingTable.java
@@ -0,0 +1,92 @@
+package org.apache.helix.resolver;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.spectator.RoutingTableProvider;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * A routing table that can also return all resources, partitions, and states in the cluster
+ */
+public class ResolverRoutingTable extends RoutingTableProvider {
+ Map> _resourceMap;
+ Set _stateSet;
+
+ /**
+ * Create the table.
+ */
+ public ResolverRoutingTable() {
+ super();
+ _resourceMap = Maps.newHashMap();
+ _stateSet = Sets.newHashSet();
+ }
+
+ /**
+ * Get all resources that are currently served in the cluster.
+ * @return set of resource names
+ */
+ public synchronized Set getResources() {
+ return Sets.newHashSet(_resourceMap.keySet());
+ }
+
+ /**
+ * Get all partitions currently served for a resource.
+ * @param resource the resource for which to look up partitions
+ * @return set of partition names
+ */
+ public synchronized Set getPartitions(String resource) {
+ if (_resourceMap.containsKey(resource)) {
+ return Sets.newHashSet(_resourceMap.get(resource));
+ } else {
+ return Collections.emptySet();
+ }
+ }
+
+ /**
+ * Get all states that partitions of all resources are currently in
+ * @return set of state names
+ */
+ public synchronized Set getStates() {
+ return Sets.newHashSet(_stateSet);
+ }
+
+ @Override
+ public synchronized void onExternalViewChange(List externalViewList,
+ NotificationContext changeContext) {
+ super.onExternalViewChange(externalViewList, changeContext);
+ _resourceMap.clear();
+ _stateSet.clear();
+ for (ExternalView externalView : externalViewList) {
+ _resourceMap.put(externalView.getResourceName(), externalView.getPartitionSet());
+ for (String partition : externalView.getPartitionSet()) {
+ _stateSet.addAll(externalView.getStateMap(partition).values());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/zk/ZKHelixResolver.java
----------------------------------------------------------------------
diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/zk/ZKHelixResolver.java b/helix-ipc/src/main/java/org/apache/helix/resolver/zk/ZKHelixResolver.java
new file mode 100644
index 0000000..48fa81a
--- /dev/null
+++ b/helix-ipc/src/main/java/org/apache/helix/resolver/zk/ZKHelixResolver.java
@@ -0,0 +1,45 @@
+package org.apache.helix.resolver.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.resolver.AbstractHelixResolver;
+
+/**
+ * A ZooKeeper-specific {@link org.apache.helix.resolver.HelixResolver}
+ */
+public class ZKHelixResolver extends AbstractHelixResolver {
+ private final String _zkAddress;
+
+ /**
+ * Create a ZK-based Helix resolver
+ * @param zkConnectString the connection string to the ZooKeeper ensemble
+ */
+ public ZKHelixResolver(String zkConnectString) {
+ _zkAddress = zkConnectString;
+ }
+
+ @Override
+ protected HelixManager createManager(String cluster) {
+ return HelixManagerFactory.getZKHelixManager(cluster, null, InstanceType.SPECTATOR, _zkAddress);
+ }
+}