Return-Path: X-Original-To: apmail-helix-commits-archive@minotaur.apache.org Delivered-To: apmail-helix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0BBA2118CB for ; Thu, 28 Aug 2014 18:00:16 +0000 (UTC) Received: (qmail 51743 invoked by uid 500); 28 Aug 2014 18:00:15 -0000 Delivered-To: apmail-helix-commits-archive@helix.apache.org Received: (qmail 51668 invoked by uid 500); 28 Aug 2014 18:00:15 -0000 Mailing-List: contact commits-help@helix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.apache.org Delivered-To: mailing list commits@helix.apache.org Received: (qmail 51595 invoked by uid 99); 28 Aug 2014 18:00:15 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Aug 2014 18:00:15 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 80B51A04F67; Thu, 28 Aug 2014 18:00:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kanak@apache.org To: commits@helix.apache.org Date: Thu, 28 Aug 2014 18:00:16 -0000 Message-Id: <4da9f5cffb7642baaee269968cd458bd@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/4] git commit: [HELIX-470] Netty-based IPC layer [HELIX-470] Netty-based IPC layer Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f2475fa9 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f2475fa9 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f2475fa9 Branch: refs/heads/master Commit: f2475fa9a6123052fea2588cdd4e439ddc7af020 Parents: bfe5d2d Author: Greg Brandt Authored: Tue Aug 26 13:14:36 2014 -0700 Committer: Greg Brandt Committed: Tue Aug 26 13:14:36 2014 -0700 ---------------------------------------------------------------------- .../helix/spectator/RoutingTableProvider.java | 24 +- .../test/java/org/apache/helix/TestHelper.java | 10 + helix-ipc/LICENSE | 273 +++++++++++ helix-ipc/NOTICE | 33 ++ helix-ipc/pom.xml | 141 ++++++ helix-ipc/src/assemble/assembly.xml | 60 +++ helix-ipc/src/main/config/log4j.properties | 31 ++ .../org/apache/helix/ipc/HelixIPCCallback.java | 32 ++ .../helix/ipc/HelixIPCMessageManager.java | 163 ++++++ .../org/apache/helix/ipc/HelixIPCService.java | 49 ++ .../helix/ipc/netty/NettyHelixIPCService.java | 490 +++++++++++++++++++ .../helix/resolver/AbstractHelixResolver.java | 295 +++++++++++ .../org/apache/helix/resolver/HelixAddress.java | 70 +++ .../helix/resolver/HelixMessageScope.java | 151 ++++++ .../apache/helix/resolver/HelixResolver.java | 47 ++ .../helix/resolver/ResolverRoutingTable.java | 92 ++++ .../helix/resolver/zk/ZKHelixResolver.java | 45 ++ .../helix/ipc/TestNettyHelixIPCService.java | 353 +++++++++++++ .../helix/ipc/benchmark/BenchmarkDriver.java | 221 +++++++++ .../helix/resolver/TestZKHelixResolver.java | 161 ++++++ helix-ipc/src/test/resources/build_benchmark.sh | 29 ++ helix-ipc/src/test/resources/run_benchmark.sh | 37 ++ pom.xml | 1 + 23 files changed, 2806 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java index 7799ca1..ccce64a 100644 --- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java +++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java @@ -38,7 +38,8 @@ import org.apache.helix.model.ExternalView; import org.apache.helix.model.InstanceConfig; import org.apache.log4j.Logger; -public class RoutingTableProvider implements ExternalViewChangeListener, InstanceConfigChangeListener { +public class RoutingTableProvider implements ExternalViewChangeListener, + InstanceConfigChangeListener { private static final Logger logger = Logger.getLogger(RoutingTableProvider.class); private final AtomicReference _routingTableRef; @@ -91,6 +92,15 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc return instanceSet; } + /** + * Get the configuration of an instance from its name + * @param instanceName the instance ID + * @return InstanceConfig if present, null otherwise + */ + public InstanceConfig getInstanceConfig(String instanceName) { + return _routingTableRef.get().getConfig(instanceName); + } + @Override public void onExternalViewChange(List externalViewList, NotificationContext changeContext) { @@ -124,12 +134,13 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor(); Builder keyBuilder = accessor.keyBuilder(); + RoutingTable newRoutingTable = new RoutingTable(); List configList = accessor.getChildValues(keyBuilder.instanceConfigs()); Map instanceConfigMap = new HashMap(); for (InstanceConfig config : configList) { instanceConfigMap.put(config.getId(), config); + newRoutingTable.addConfig(config); } - RoutingTable newRoutingTable = new RoutingTable(); if (externalViewList != null) { for (ExternalView extView : externalViewList) { String resourceName = extView.getId(); @@ -154,9 +165,11 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc class RoutingTable { private final HashMap resourceInfoMap; + private final Map instanceConfigMap; public RoutingTable() { resourceInfoMap = new HashMap(); + instanceConfigMap = new HashMap(); } public void addEntry(String resourceName, String partitionName, String state, @@ -169,10 +182,17 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc } + public void addConfig(InstanceConfig config) { + instanceConfigMap.put(config.getInstanceName(), config); + } + ResourceInfo get(String resourceName) { return resourceInfoMap.get(resourceName); } + InstanceConfig getConfig(String instanceName) { + return instanceConfigMap.get(instanceName); + } } class ResourceInfo { http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-core/src/test/java/org/apache/helix/TestHelper.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java index 4a9139f..64796ba 100644 --- a/helix-core/src/test/java/org/apache/helix/TestHelper.java +++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java @@ -20,7 +20,9 @@ package org.apache.helix; */ import java.io.File; +import java.io.IOException; import java.lang.reflect.Method; +import java.net.ServerSocket; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -769,4 +771,12 @@ public class TestHelper { System.out.println(sb.toString()); } + public static int getRandomPort() throws IOException { + ServerSocket sock = new ServerSocket(); + sock.bind(null); + int port = sock.getLocalPort(); + sock.close(); + return port; + } + } http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/LICENSE ---------------------------------------------------------------------- diff --git a/helix-ipc/LICENSE b/helix-ipc/LICENSE new file mode 100644 index 0000000..413913f --- /dev/null +++ b/helix-ipc/LICENSE @@ -0,0 +1,273 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + + +For xstream: + +Copyright (c) 2003-2006, Joe Walnes +Copyright (c) 2006-2009, 2011 XStream Committers +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this list of +conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, this list of +conditions and the following disclaimer in the documentation and/or other materials provided +with the distribution. + +3. Neither the name of XStream nor the names of its contributors may be used to endorse +or promote products derived from this software without specific prior written +permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES +OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT +SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED +TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY +WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +DAMAGE. + +for jline: + +Copyright (c) 2002-2006, Marc Prud'hommeaux +All rights reserved. + +Redistribution and use in source and binary forms, with or +without modification, are permitted provided that the following +conditions are met: + +Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + +Redistributions in binary form must reproduce the above copyright +notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with +the distribution. + +Neither the name of JLine nor the names of its contributors +may be used to endorse or promote products derived from this +software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, +BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY +AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO +EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, +OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING +IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED +OF THE POSSIBILITY OF SUCH DAMAGE. + + + http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/NOTICE ---------------------------------------------------------------------- diff --git a/helix-ipc/NOTICE b/helix-ipc/NOTICE new file mode 100644 index 0000000..1ee0d24 --- /dev/null +++ b/helix-ipc/NOTICE @@ -0,0 +1,33 @@ +Apache Helix +Copyright 2014 The Apache Software Foundation + + +I. Included Software + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). +Licensed under the Apache License 2.0. + +This product includes software developed at +Codehaus (http://www.codehaus.org/). +Licensed under the BSD License. + +This product includes software developed at +jline (http://jline.sourceforge.net/). +Licensed under the BSD License. + +This product includes software developed at +Google (http://www.google.com/). +Licensed under the Apache License 2.0. + +This product includes software developed at +snakeyaml (http://www.snakeyaml.org/). +Licensed under the Apache License 2.0. + +This product includes software developed at +zkclient (https://github.com/sgroschupf/zkclient). +Licensed under the Apache License 2.0. + +II. License Summary +- Apache License 2.0 +- BSD License http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/pom.xml ---------------------------------------------------------------------- diff --git a/helix-ipc/pom.xml b/helix-ipc/pom.xml new file mode 100644 index 0000000..90ab11d --- /dev/null +++ b/helix-ipc/pom.xml @@ -0,0 +1,141 @@ + + + + + org.apache.helix + helix + 0.7.1-SNAPSHOT + + 4.0.0 + + helix-ipc + bundle + + Apache Helix :: IPC + + + + javax.management*, + org.apache.commons.math*;version="[2.1,3)", + org.apache.log4j*;version="[1.2,2)", + org.restlet;version="[2.1.4,3]", + * + + + org.apache.helix.tools* + + org.apache.helix*;version="${project.version};-noimport:=true + + + + + org.apache.helix + helix-core + + + io.netty + netty-all + 4.0.21.Final + + + com.codahale.metrics + metrics-core + 3.0.1 + + + org.apache.helix + helix-core + test-jar + test + + + org.testng + testng + test + + + junit + junit + + + + + + + + ${basedir}/src/main/resources + true + + + ${basedir} + + DISCLAIMER + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + org.codehaus.mojo + appassembler-maven-plugin + + + + org.apache.helix.ipc.netty.NettyHelixIPCService + netty-helix-ipc-service + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + + jar-with-dependencies + + + + + package + + single + + + + + + + http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/assemble/assembly.xml ---------------------------------------------------------------------- diff --git a/helix-ipc/src/assemble/assembly.xml b/helix-ipc/src/assemble/assembly.xml new file mode 100644 index 0000000..c2d08a1 --- /dev/null +++ b/helix-ipc/src/assemble/assembly.xml @@ -0,0 +1,60 @@ + + + + pkg + + tar + + + + ${project.build.directory}/${project.artifactId}-pkg/bin + bin + unix + 0755 + 0755 + + + ${project.build.directory}/${project.artifactId}-pkg/repo/ + repo + 0755 + 0755 + + **/*.xml + + + + ${project.build.directory}/${project.artifactId}-pkg/conf + conf + unix + 0755 + 0755 + + + ${project.basedir} + / + + LICENSE + NOTICE + DISCLAIMER + + 0755 + + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/config/log4j.properties ---------------------------------------------------------------------- diff --git a/helix-ipc/src/main/config/log4j.properties b/helix-ipc/src/main/config/log4j.properties new file mode 100644 index 0000000..4b3dc31 --- /dev/null +++ b/helix-ipc/src/main/config/log4j.properties @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=ERROR,A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +log4j.logger.org.I0Itec=ERROR +log4j.logger.org.apache=ERROR http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCCallback.java ---------------------------------------------------------------------- diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCCallback.java b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCCallback.java new file mode 100644 index 0000000..9ea23ca --- /dev/null +++ b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCCallback.java @@ -0,0 +1,32 @@ +package org.apache.helix.ipc; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import io.netty.buffer.ByteBuf; +import org.apache.helix.resolver.HelixMessageScope; + +import java.util.UUID; + +/** + * Callback registered per message type to handle messages. + */ +public interface HelixIPCCallback { + void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message); +} http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCMessageManager.java ---------------------------------------------------------------------- diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCMessageManager.java b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCMessageManager.java new file mode 100644 index 0000000..89b10eb --- /dev/null +++ b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCMessageManager.java @@ -0,0 +1,163 @@ +package org.apache.helix.ipc; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import io.netty.buffer.ByteBuf; +import org.apache.helix.resolver.HelixAddress; +import org.apache.helix.resolver.HelixMessageScope; +import org.apache.log4j.Logger; + +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A wrapper around a base IPC service that manages message retries / timeouts. + *

+ * This class manages retries and timeouts, and can be used in the same way as a + * {@link org.apache.helix.ipc.HelixIPCService}. + *

+ *

+ * A message will be sent until the max number of retries has been reached (i.e. timed out), or it + * is acknowledged by the recipient. If the max number of retries is -1, it will be retried forever. + *

+ *

+ * A callback should be registered for every acknowledgement message type associated with any + * original message type sent by this class. + *

+ *

+ * For example, consider we have the two message types defined: DATA_REQ = 1, DATA_ACK = 2. One + * would do the following: + * + *

+ * messageManager.registerCallback(DATA_ACK, new HelixIPCCallback() {
+ *   public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) {
+ *     // Process ACK
+ *   }
+ * 
+ *   public void onError(HelixMessageScope scope, UUID messageId, Throwable cause) {
+ *     // Message error or timeout
+ *   }
+ * });
+ * 
+ * messageManager.send(destinations, DATA_REQ, messageId, data);
+ * 
+ * + *

+ *

+ * In send, we note messageId, and retry until we get a DATA_ACK for the same messageId. The + * callback registered with the message manager will only be called once, even if the message is + * acknowledged several times. + *

+ */ +public class HelixIPCMessageManager implements HelixIPCService { + + private static final Logger LOG = Logger.getLogger(HelixIPCMessageManager.class); + + private final ScheduledExecutorService scheduler; + private final HelixIPCService baseIpcService; + private final long messageTimeoutMillis; + private final int maxNumRetries; + private final ConcurrentMap pendingMessages; + private final ConcurrentMap retriesLeft; + private final ConcurrentMap messageBuffers; + private final ConcurrentMap callbacks; + + public HelixIPCMessageManager(ScheduledExecutorService scheduler, HelixIPCService baseIpcService, + long messageTimeoutMillis, int maxNumRetries) { + this.scheduler = scheduler; + this.baseIpcService = baseIpcService; + this.maxNumRetries = maxNumRetries; + this.messageTimeoutMillis = messageTimeoutMillis; + this.pendingMessages = new ConcurrentHashMap(); + this.retriesLeft = new ConcurrentHashMap(); + this.messageBuffers = new ConcurrentHashMap(); + this.callbacks = new ConcurrentHashMap(); + } + + @Override + public void start() throws Exception { + baseIpcService.start(); + } + + @Override + public void shutdown() throws Exception { + baseIpcService.shutdown(); + } + + @Override + public void send(final HelixAddress destination, final int messageType, final UUID messageId, + final ByteBuf message) { + // State + pendingMessages.put(messageId, true); + retriesLeft.putIfAbsent(messageId, new AtomicInteger(maxNumRetries)); + messageBuffers.put(messageId, message); + + // Will free it when we've finally received response + message.retain(); + + // Send initial message + baseIpcService.send(destination, messageType, messageId, message); + + // Retries + scheduler.schedule(new Runnable() { + @Override + public void run() { + Boolean isPending = pendingMessages.get(messageId); + AtomicInteger numLeft = retriesLeft.get(messageId); + if (numLeft != null && isPending != null && isPending) { + if (numLeft.decrementAndGet() > 0 || maxNumRetries == -1) { + // n.b. will schedule another retry + send(destination, messageType, messageId, message); + } else { + LOG.warn("Message " + messageId + " timed out after " + maxNumRetries + " retries"); + } + } + } + }, messageTimeoutMillis, TimeUnit.MILLISECONDS); + } + + @Override + public void registerCallback(final int messageType, final HelixIPCCallback callback) { + + // This callback will first check if the message is pending, then delegate to the provided + // callback if it has not yet done so. + HelixIPCCallback wrappedCallback = new HelixIPCCallback() { + @Override + public void onMessage(HelixMessageScope scope, UUID messageId, ByteBuf message) { + if (pendingMessages.replace(messageId, true, false)) { + pendingMessages.remove(messageId); + ByteBuf originalMessage = messageBuffers.remove(messageId); + if (originalMessage != null) { + originalMessage.release(); + } + retriesLeft.remove(messageId); + callback.onMessage(scope, messageId, message); + } + } + }; + + callbacks.put(messageType, wrappedCallback); + baseIpcService.registerCallback(messageType, wrappedCallback); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCService.java ---------------------------------------------------------------------- diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCService.java b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCService.java new file mode 100644 index 0000000..6158514 --- /dev/null +++ b/helix-ipc/src/main/java/org/apache/helix/ipc/HelixIPCService.java @@ -0,0 +1,49 @@ +package org.apache.helix.ipc; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import io.netty.buffer.ByteBuf; +import org.apache.helix.resolver.HelixAddress; + +import java.util.UUID; + +/** + * Allows message passing among instances in Helix clusters. + *

+ * Messages are sent asynchronously using {@link #send}, and handled by callbacks registered via + * {@link #registerCallback} + *

+ */ +public interface HelixIPCService { + + static final String IPC_PORT = "IPC_PORT"; + + /** Starts service (must call before {@link #send}) */ + void start() throws Exception; + + /** Shuts down service and releases any resources */ + void shutdown() throws Exception; + + /** Sends a message to one or more instances that map to a cluster scope. */ + void send(HelixAddress destination, int messageType, UUID messageId, ByteBuf message); + + /** Registers a callback for a given message type */ + void registerCallback(int messageType, HelixIPCCallback callback); +} http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java ---------------------------------------------------------------------- diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java new file mode 100644 index 0000000..00d6157 --- /dev/null +++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java @@ -0,0 +1,490 @@ +package org.apache.helix.ipc.netty; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.LengthFieldPrepender; + +import java.net.InetSocketAddress; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.helix.ipc.HelixIPCCallback; +import org.apache.helix.ipc.HelixIPCService; +import org.apache.helix.resolver.HelixAddress; +import org.apache.helix.resolver.HelixMessageScope; +import org.apache.log4j.Logger; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.JmxReporter; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; + +/** + * Provides partition/state-level messaging among nodes in a Helix cluster. + *

+ * The message format is (where len == 4B, and contains the length of the next field) + * + *

+ *      +----------------------+
+ *      | totalLength (4B)     |
+ *      +----------------------+
+ *      | version (4B)         |
+ *      +----------------------+
+ *      | messageType (4B)     |
+ *      +----------------------+
+ *      | messageId (16B)      |
+ *      +----------------------+
+ *      | len | cluster        |
+ *      +----------------------+
+ *      | len | resource       |
+ *      +----------------------+
+ *      | len | partition      |
+ *      +----------------------+
+ *      | len | state          |
+ *      +----------------------+
+ *      | len | srcInstance    |
+ *      +----------------------+
+ *      | len | dstInstance    |
+ *      +----------------------+
+ *      | len | message        |
+ *      +----------------------+
+ * 
+ * + *

+ */ +public class NettyHelixIPCService implements HelixIPCService { + + private static final Logger LOG = Logger.getLogger(NettyHelixIPCService.class); + private static final int MESSAGE_VERSION = 1; + + // Parameters for length header field of message (tells decoder to interpret but preserve length + // field in message) + private static final int MAX_FRAME_LENGTH = 1024 * 1024; + private static final int LENGTH_FIELD_OFFSET = 0; + private static final int LENGTH_FIELD_LENGTH = 4; + private static final int LENGTH_ADJUSTMENT = -4; + private static final int INITIAL_BYTES_TO_STRIP = 0; + private static final int NUM_LENGTH_FIELDS = 7; + + private final Config config; + private final AtomicBoolean isShutdown; + private final Map> channelMap; + private final ConcurrentMap channelOpenTimes; + private final MetricRegistry metricRegistry; + private final ConcurrentMap callbacks; + + private EventLoopGroup eventLoopGroup; + private Bootstrap clientBootstrap; + private Meter statTxMsg; + private Meter statRxMsg; + private Meter statTxBytes; + private Meter statRxBytes; + private Counter statChannelOpen; + private Counter statError; + private JmxReporter jmxReporter; + + public NettyHelixIPCService(Config config) { + super(); + this.config = config; + this.isShutdown = new AtomicBoolean(true); + this.channelMap = new HashMap>(); + this.channelOpenTimes = new ConcurrentHashMap(); + this.metricRegistry = new MetricRegistry(); + this.callbacks = new ConcurrentHashMap(); + } + + /** + * Starts message handling server, creates client bootstrap, and bootstraps partition routing + * table. + */ + public void start() throws Exception { + if (isShutdown.getAndSet(false)) { + eventLoopGroup = new NioEventLoopGroup(); + + statTxMsg = metricRegistry.meter(MetricRegistry.name(NettyHelixIPCService.class, "txMsg")); + statRxMsg = metricRegistry.meter(MetricRegistry.name(NettyHelixIPCService.class, "rxMsg")); + statTxBytes = + metricRegistry.meter(MetricRegistry.name(NettyHelixIPCService.class, "txBytes")); + statRxBytes = + metricRegistry.meter(MetricRegistry.name(NettyHelixIPCService.class, "rxBytes")); + statChannelOpen = + metricRegistry.counter(MetricRegistry.name(NettyHelixIPCService.class, "channelOpen")); + statError = metricRegistry.counter(MetricRegistry.name(NettyHelixIPCService.class, "error")); + + // Report metrics via JMX + jmxReporter = JmxReporter.forRegistry(metricRegistry).build(); + jmxReporter.start(); + + new ServerBootstrap().group(eventLoopGroup).channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.SO_KEEPALIVE, true) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + socketChannel.pipeline().addLast( + new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, + LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP)); + socketChannel.pipeline().addLast(new HelixIPCCallbackHandler()); + } + }).bind(new InetSocketAddress(config.getPort())); + + clientBootstrap = + new Bootstrap().group(eventLoopGroup).channel(NioSocketChannel.class) + .option(ChannelOption.SO_KEEPALIVE, true) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + socketChannel.pipeline().addLast( + new LengthFieldPrepender(LENGTH_FIELD_LENGTH, true)); + } + }); + } + } + + /** + * Shuts down event loops for message handling server and message passing client. + */ + public void shutdown() throws Exception { + if (!isShutdown.getAndSet(true)) { + jmxReporter.stop(); + eventLoopGroup.shutdownGracefully(); + } + } + + /** + * Sends a message to all partitions with a given state in the cluster. + */ + @Override + public void send(HelixAddress destination, int messageType, UUID messageId, ByteBuf message) { + // Send message + try { + // Get list of channels + List channels = channelMap.get(destination.getSocketAddress()); + if (channels == null) { + synchronized (channelMap) { + channels = channelMap.get(destination.getSocketAddress()); + if (channels == null) { + channels = new ArrayList(config.getNumConnections()); + for (int i = 0; i < config.getNumConnections(); i++) { + channels.add(null); + } + channelMap.put(destination.getSocketAddress(), channels); + } + } + } + + // Pick the channel for this scope + int idx = (Integer.MAX_VALUE & destination.getScope().hashCode()) % channels.size(); + Channel channel = channels.get(idx); + if (channel == null || !channel.isOpen() || isExpired(channel)) { + synchronized (channelMap) { + channel = channels.get(idx); + if (channel == null || !channel.isOpen() || isExpired(channel)) { + if (channel != null && channel.isOpen()) { + channel.close(); + } + channel = clientBootstrap.connect(destination.getSocketAddress()).sync().channel(); + channels.set(idx, channel); + statChannelOpen.inc(); + channelOpenTimes.put(channel, System.currentTimeMillis()); + } + } + } + + // Compute total length + int headerLength = + NUM_LENGTH_FIELDS + * (Integer.SIZE / 8) + + (Integer.SIZE / 8) + * 2 // version, type + + (Long.SIZE / 8) + * 2 // 128 bit UUID + + getLength(destination.getScope().getCluster()) + + getLength(destination.getScope().getResource()) + + getLength(destination.getScope().getPartition()) + + getLength(destination.getScope().getState()) + getLength(config.getInstanceName()) + + getLength(destination.getInstanceName()); + int messageLength = message == null ? 0 : message.readableBytes(); + + // Build message header + ByteBuf headerBuf = channel.alloc().buffer(headerLength); + headerBuf.writeInt(MESSAGE_VERSION).writeInt(messageType) + .writeLong(messageId.getMostSignificantBits()) + .writeLong(messageId.getLeastSignificantBits()); + writeStringWithLength(headerBuf, destination.getScope().getCluster()); + writeStringWithLength(headerBuf, destination.getScope().getResource()); + writeStringWithLength(headerBuf, destination.getScope().getPartition()); + writeStringWithLength(headerBuf, destination.getScope().getState()); + writeStringWithLength(headerBuf, config.getInstanceName()); + writeStringWithLength(headerBuf, destination.getInstanceName()); + + // Compose message header and payload + headerBuf.writeInt(messageLength); + CompositeByteBuf fullByteBuf = channel.alloc().compositeBuffer(2); + fullByteBuf.addComponent(headerBuf); + fullByteBuf.writerIndex(headerBuf.readableBytes()); + if (message != null) { + fullByteBuf.addComponent(message); + fullByteBuf.writerIndex(fullByteBuf.writerIndex() + message.readableBytes()); + } + + // Send + statTxMsg.mark(); + statTxBytes.mark(fullByteBuf.readableBytes()); + channel.writeAndFlush(fullByteBuf); + } catch (Exception e) { + statError.inc(); + throw new IllegalStateException("Could not send message to " + destination, e); + } + } + + private boolean isExpired(Channel channel) { + Long channelOpenTime = channelOpenTimes.get(channel); + return channelOpenTime != null + && System.currentTimeMillis() - channelOpenTime >= config.getMaxChannelLifeMillis(); + } + + @Override + public void registerCallback(int messageType, HelixIPCCallback callback) { + callbacks.put(messageType, callback); + } + + @ChannelHandler.Sharable + private class HelixIPCCallbackHandler extends SimpleChannelInboundHandler { + + HelixIPCCallbackHandler() { + super(false); // we will manage reference + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception { + try { + int idx = 0; + + // Message length + int messageLength = byteBuf.readInt(); + idx += 4; + + // Message version + @SuppressWarnings("unused") + int messageVersion = byteBuf.readInt(); + idx += 4; + + // Message type + int messageType = byteBuf.readInt(); + idx += 4; + + // Message ID + UUID messageId = new UUID(byteBuf.readLong(), byteBuf.readLong()); + idx += 16; + + // Cluster + byteBuf.readerIndex(idx); + int clusterSize = byteBuf.readInt(); + idx += 4; + checkLength("clusterSize", clusterSize, messageLength); + String clusterName = toNonEmptyString(clusterSize, byteBuf); + idx += clusterSize; + + // Resource + byteBuf.readerIndex(idx); + int resourceSize = byteBuf.readInt(); + idx += 4; + checkLength("resourceSize", resourceSize, messageLength); + String resourceName = toNonEmptyString(resourceSize, byteBuf); + idx += resourceSize; + + // Partition + byteBuf.readerIndex(idx); + int partitionSize = byteBuf.readInt(); + idx += 4; + checkLength("partitionSize", partitionSize, messageLength); + String partitionName = toNonEmptyString(partitionSize, byteBuf); + idx += partitionSize; + + // State + byteBuf.readerIndex(idx); + int stateSize = byteBuf.readInt(); + idx += 4; + checkLength("stateSize", stateSize, messageLength); + String state = toNonEmptyString(stateSize, byteBuf); + idx += stateSize; + + // Source instance + byteBuf.readerIndex(idx); + int srcInstanceSize = byteBuf.readInt(); + idx += 4; + checkLength("srcInstanceSize", srcInstanceSize, messageLength); + String srcInstance = toNonEmptyString(srcInstanceSize, byteBuf); + idx += srcInstanceSize; + + // Destination instance + byteBuf.readerIndex(idx); + int dstInstanceSize = byteBuf.readInt(); + idx += 4; + checkLength("dstInstanceSize", dstInstanceSize, messageLength); + String dstInstance = toNonEmptyString(dstInstanceSize, byteBuf); + idx += dstInstanceSize; + + // Position at message + byteBuf.readerIndex(idx + 4); + + // Error check + if (dstInstance == null) { + throw new IllegalStateException("Received message addressed to null destination from " + + srcInstance); + } else if (!dstInstance.equals(config.getInstanceName())) { + throw new IllegalStateException(config.getInstanceName() + + " received message addressed to " + dstInstance + " from " + srcInstance); + } else if (callbacks.get(messageType) == null) { + throw new IllegalStateException("No callback registered for message type " + messageType); + } + + // Build scope + HelixMessageScope scope = + new HelixMessageScope.Builder().cluster(clusterName).resource(resourceName) + .partition(partitionName).state(state).sourceInstance(srcInstance).build(); + + // Get callback + HelixIPCCallback callback = callbacks.get(messageType); + if (callback == null) { + throw new IllegalStateException("No callback registered for message type " + messageType); + } + + // Handle callback + callback.onMessage(scope, messageId, byteBuf); + + // Stats + statRxMsg.mark(); + statRxBytes.mark(messageLength); + } finally { + byteBuf.release(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable cause) { + LOG.error(cause); + } + } + + /** Given a byte buf w/ a certain reader index, encodes the next length bytes as a String */ + private static String toNonEmptyString(int length, ByteBuf byteBuf) { + if (byteBuf.readableBytes() >= length) { + return byteBuf.toString(byteBuf.readerIndex(), length, Charset.defaultCharset()); + } + return null; + } + + /** Writes [s.length(), s] to buf, or [0] if s is null */ + private static void writeStringWithLength(ByteBuf buf, String s) { + if (s == null) { + buf.writeInt(0); + return; + } + + buf.writeInt(s.length()); + for (int i = 0; i < s.length(); i++) { + buf.writeByte(s.charAt(i)); + } + } + + /** Returns the length of a string, or 0 if s is null */ + private static int getLength(String s) { + return s == null ? 0 : s.length(); + } + + /** + * @throws java.lang.IllegalArgumentException if length > messageLength (attempt to prevent OOM + * exceptions) + */ + private static void checkLength(String fieldName, int length, int messageLength) + throws IllegalArgumentException { + if (length > messageLength) { + throw new IllegalArgumentException(fieldName + "=" + length + + " is greater than messageLength=" + messageLength); + } + } + + public static class Config { + private String instanceName; + private int port; + private int numConnections = 1; + private long maxChannelLifeMillis = 5000; + + public Config setInstanceName(String instanceName) { + this.instanceName = instanceName; + return this; + } + + public Config setPort(int port) { + this.port = port; + return this; + } + + public Config setNumConnections(int numConnections) { + this.numConnections = numConnections; + return this; + } + + public Config setMaxChannelLifeMillis(long maxChannelLifeMillis) { + this.maxChannelLifeMillis = maxChannelLifeMillis; + return this; + } + + public String getInstanceName() { + return instanceName; + } + + public int getPort() { + return port; + } + + public int getNumConnections() { + return numConnections; + } + + public long getMaxChannelLifeMillis() { + return maxChannelLifeMillis; + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/AbstractHelixResolver.java ---------------------------------------------------------------------- diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/AbstractHelixResolver.java b/helix-ipc/src/main/java/org/apache/helix/resolver/AbstractHelixResolver.java new file mode 100644 index 0000000..c0fd2bb --- /dev/null +++ b/helix-ipc/src/main/java/org/apache/helix/resolver/AbstractHelixResolver.java @@ -0,0 +1,295 @@ +package org.apache.helix.resolver; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.NotificationContext; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.InstanceConfig; +import org.apache.log4j.Logger; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +/** + * A basic implementation of a resolver in terms of expiring routing tables + */ +public abstract class AbstractHelixResolver implements HelixResolver { + private static final Logger LOG = Logger.getLogger(AbstractHelixResolver.class); + private static final int DEFAULT_THREAD_POOL_SIZE = 10; + private static final long DEFAULT_LEASE_LENGTH_MS = 60 * 60 * 1000; // TODO: are these good + // values? + private static final String IPC_PORT = "IPC_PORT"; + private final Map _connections; + private boolean _isConnected; + private ScheduledExecutorService _executor; + + protected AbstractHelixResolver() { + _connections = Maps.newHashMap(); + _isConnected = false; + } + + @Override + public void connect() { + _executor = Executors.newScheduledThreadPool(DEFAULT_THREAD_POOL_SIZE); + _isConnected = true; + } + + @Override + public void disconnect() { + synchronized (_connections) { + for (Spectator connection : _connections.values()) { + connection.shutdown(); + } + _connections.clear(); + } + _executor.shutdown(); + _isConnected = false; + } + + @Override + public Set getDestinations(HelixMessageScope scope) { + if (!scope.isValid()) { + LOG.error("Scope " + scope + " is not valid!"); + return new HashSet(); + } else if (!_isConnected) { + LOG.error("Cannot resolve " + scope + " without first connecting!"); + return new HashSet(); + } + + // Connect or refresh connection + String cluster = scope.getCluster(); + ResolverRoutingTable routingTable; + Spectator connection = _connections.get(cluster); + if (connection == null || !connection.getManager().isConnected()) { + synchronized (_connections) { + connection = _connections.get(cluster); + if (connection == null || !connection.getManager().isConnected()) { + connection = new Spectator(cluster, DEFAULT_LEASE_LENGTH_MS); + connection.init(); + _connections.put(cluster, connection); + } + } + } + routingTable = connection.getRoutingTable(); + + // Resolve all resources, either explicitly or match all + Set resources; + if (scope.getResource() != null) { + resources = Sets.newHashSet(scope.getResource()); + } else { + resources = routingTable.getResources(); + } + + // Resolve all partitions + Map> partitionMap = Maps.newHashMap(); + if (scope.getPartition() != null) { + for (String resource : resources) { + partitionMap.put(resource, Sets.newHashSet(scope.getPartition())); + } + } else { + for (String resource : resources) { + partitionMap.put(resource, routingTable.getPartitions(resource)); + } + } + + // Resolve all states + Set states; + if (scope.getState() != null) { + states = Sets.newHashSet(scope.getState()); + } else { + states = routingTable.getStates(); + } + + // Get all the participants that match + Set participants = Sets.newHashSet(); + for (String resource : resources) { + for (String partition : partitionMap.get(resource)) { + for (String state : states) { + participants.addAll(routingTable.getInstances(resource, partition, state)); + } + } + } + + // Resolve those participants + Set result = new HashSet(); + for (InstanceConfig participant : participants) { + String ipcPort = participant.getRecord().getSimpleField(IPC_PORT); + if (ipcPort == null) { + LOG.error("No ipc address registered for target instance " + participant.getInstanceName() + + ", skipping"); + } else { + result.add(new HelixAddress(scope, participant.getInstanceName(), new InetSocketAddress( + participant.getHostName(), Integer.valueOf(ipcPort)))); + } + } + + return result; + } + + @Override + public HelixAddress getSource(HelixMessageScope scope) { + // Connect or refresh connection + String cluster = scope.getCluster(); + ResolverRoutingTable routingTable; + Spectator connection = _connections.get(cluster); + if (connection == null || !connection.getManager().isConnected()) { + synchronized (_connections) { + connection = _connections.get(cluster); + if (connection == null || !connection.getManager().isConnected()) { + connection = new Spectator(cluster, DEFAULT_LEASE_LENGTH_MS); + connection.init(); + _connections.put(cluster, connection); + } + } + } + routingTable = connection.getRoutingTable(); + + if (scope.getSourceInstance() != null) { + InstanceConfig config = routingTable.getInstanceConfig(scope.getSourceInstance()); + String ipcPort = config.getRecord().getSimpleField(IPC_PORT); + if (ipcPort == null) { + throw new IllegalStateException("No IPC address registered for source instance " + + scope.getSourceInstance()); + } + return new HelixAddress(scope, scope.getSourceInstance(), new InetSocketAddress( + config.getHostName(), Integer.valueOf(ipcPort))); + } + + return null; + } + + @Override + public boolean isConnected() { + return _isConnected; + } + + /** + * Create a Helix manager connection based on the appropriate backing store + * @param cluster the name of the cluster to connect to + * @return HelixManager instance + */ + protected abstract HelixManager createManager(String cluster); + + private class Spectator { + private final String _cluster; + private final HelixManager _manager; + private final ResolverRoutingTable _routingTable; + private final long _leaseLengthMs; + private ScheduledFuture _future; + + /** + * Initialize a spectator. This does not automatically connect. + * @param cluster the cluster to spectate + * @param leaseLengthMs the expiry of this spectator after the last request + */ + public Spectator(String cluster, long leaseLengthMs) { + _cluster = cluster; + _manager = createManager(cluster); + _leaseLengthMs = leaseLengthMs; + _routingTable = new ResolverRoutingTable(); + } + + /** + * Connect and initialize the routing table + */ + public void init() { + try { + _manager.connect(); + _manager.addExternalViewChangeListener(_routingTable); + _manager.addInstanceConfigChangeListener(_routingTable); + + // Force an initial refresh + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); + List externalViews = + accessor.getChildValues(accessor.keyBuilder().externalViews()); + NotificationContext context = new NotificationContext(_manager); + context.setType(NotificationContext.Type.INIT); + _routingTable.onExternalViewChange(externalViews, context); + List instanceConfigs = + accessor.getChildValues(accessor.keyBuilder().instanceConfigs()); + _routingTable.onInstanceConfigChange(instanceConfigs, context); + } catch (Exception e) { + LOG.error("Error setting up routing table", e); + } + } + + /** + * Clean up the connection to the spectated cluster + */ + public void shutdown() { + resetFuture(); + expire(); + } + + /** + * Get the dynamically-updating routing table for this cluster + * @return ResolverRoutingTable, a RoutingTableProvider that can answer questions about its + * contents + */ + public ResolverRoutingTable getRoutingTable() { + renew(); + return _routingTable; + } + + public HelixManager getManager() { + return _manager; + } + + private synchronized void renew() { + resetFuture(); + + // Schedule this connection to expire if not renewed quickly enough + _future = _executor.schedule(new Runnable() { + @Override + public void run() { + expire(); + } + }, _leaseLengthMs, TimeUnit.MILLISECONDS); + + } + + private synchronized void resetFuture() { + if (_future != null && !_future.isDone()) { + _future.cancel(true); + } + } + + private void expire() { + synchronized (_connections) { + _connections.remove(_cluster); + if (_manager != null && _manager.isConnected()) { + _manager.disconnect(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/HelixAddress.java ---------------------------------------------------------------------- diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/HelixAddress.java b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixAddress.java new file mode 100644 index 0000000..657e8bd --- /dev/null +++ b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixAddress.java @@ -0,0 +1,70 @@ +package org.apache.helix.resolver; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.base.Objects; + +import java.net.InetSocketAddress; + +public class HelixAddress { + + private final HelixMessageScope scope; + private final String instanceName; + private final InetSocketAddress socketAddress; + + public HelixAddress(HelixMessageScope scope, String instanceName, InetSocketAddress socketAddress) { + this.scope = scope; + this.instanceName = instanceName; + this.socketAddress = socketAddress; + } + + public HelixMessageScope getScope() { + return scope; + } + + public String getInstanceName() { + return instanceName; + } + + public InetSocketAddress getSocketAddress() { + return socketAddress; + } + + @Override + public String toString() { + return Objects.toStringHelper(this).addValue(scope).addValue(instanceName) + .addValue(socketAddress).toString(); + } + + @Override + public int hashCode() { + return Objects.hashCode(scope, instanceName, socketAddress); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof HelixAddress)) { + return false; + } + HelixAddress a = (HelixAddress) o; + return a.getScope().equals(scope) && a.getInstanceName().equals(instanceName) + && a.getSocketAddress().equals(socketAddress); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/HelixMessageScope.java ---------------------------------------------------------------------- diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/HelixMessageScope.java b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixMessageScope.java new file mode 100644 index 0000000..ba06556 --- /dev/null +++ b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixMessageScope.java @@ -0,0 +1,151 @@ +package org.apache.helix.resolver; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.base.Objects; + +/** + * A definition of the addressing scope of a message. + */ +public class HelixMessageScope { + private final String _cluster; + private final String _resource; + private final String _partition; + private final String _state; + private final String _srcInstance; + + private HelixMessageScope(String cluster, String resource, String partition, String state, + String srcInstance) { + _cluster = cluster; + _resource = resource; + _partition = partition; + _state = state; + _srcInstance = srcInstance; + } + + @Override + public String toString() { + return Objects.toStringHelper(this).addValue(_cluster).addValue(_resource).addValue(_partition) + .addValue(_state).toString(); + } + + @Override + public int hashCode() { + return Objects.hashCode(_cluster, _resource, _partition, _state); + } + + @Override + public boolean equals(Object other) { + if (other instanceof HelixMessageScope) { + HelixMessageScope that = (HelixMessageScope) other; + return Objects.equal(_cluster, that._cluster) && Objects.equal(_resource, that._resource) + && Objects.equal(_partition, that._partition) && Objects.equal(_state, that._state); + } + return false; + } + + public String getCluster() { + return _cluster; + } + + public String getResource() { + return _resource; + } + + public String getPartition() { + return _partition; + } + + public String getState() { + return _state; + } + + public String getSourceInstance() { + return _srcInstance; + } + + public boolean isValid() { + return _cluster != null && ((_partition != null && _resource != null) || _partition == null); + } + + /** + * Creator for a HelixMessageScope + */ + public static class Builder { + private String _cluster; + private String _resource; + private String _partition; + private String _state; + private String _sourceInstance; + + /** + * Associate the scope with a cluster + * @param cluster the cluster to scope routing to + * @return Builder + */ + public Builder cluster(String cluster) { + _cluster = cluster; + return this; + } + + /** + * Associate the scope with a resource + * @param resource a resource served by the cluster + * @return Builder + */ + public Builder resource(String resource) { + _resource = resource; + return this; + } + + /** + * Associate the scope with a partition + * @param partition a specific partition of the scoped resource + * @return Builder + */ + public Builder partition(String partition) { + _partition = partition; + return this; + } + + /** + * Associate the scope with a state + * @param state a state that a resource in the cluster can be in + * @return Builder + */ + public Builder state(String state) { + _state = state; + return this; + } + + public Builder sourceInstance(String sourceInstance) { + _sourceInstance = sourceInstance; + return this; + } + + /** + * Create the scope + * @return HelixMessageScope instance corresponding to the built scope + */ + public HelixMessageScope build() { + return new HelixMessageScope(_cluster, _resource, _partition, _state, _sourceInstance); + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java ---------------------------------------------------------------------- diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java new file mode 100644 index 0000000..b7f2f3e --- /dev/null +++ b/helix-ipc/src/main/java/org/apache/helix/resolver/HelixResolver.java @@ -0,0 +1,47 @@ +package org.apache.helix.resolver; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Set; + +/** + * An interface that resolves a message scope to a direct address. + */ +public interface HelixResolver { + /** + * Initialize a connection for scope resolution. + */ + void connect(); + + /** + * Tear down any state and open connections to Helix clusters. + */ + void disconnect(); + + /** + * Check the connection status + * @return true if connected, false otherwise + */ + boolean isConnected(); + + Set getDestinations(HelixMessageScope scope); + + HelixAddress getSource(HelixMessageScope scope); +} http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/ResolverRoutingTable.java ---------------------------------------------------------------------- diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/ResolverRoutingTable.java b/helix-ipc/src/main/java/org/apache/helix/resolver/ResolverRoutingTable.java new file mode 100644 index 0000000..2bc5413 --- /dev/null +++ b/helix-ipc/src/main/java/org/apache/helix/resolver/ResolverRoutingTable.java @@ -0,0 +1,92 @@ +package org.apache.helix.resolver; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.NotificationContext; +import org.apache.helix.model.ExternalView; +import org.apache.helix.spectator.RoutingTableProvider; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +/** + * A routing table that can also return all resources, partitions, and states in the cluster + */ +public class ResolverRoutingTable extends RoutingTableProvider { + Map> _resourceMap; + Set _stateSet; + + /** + * Create the table. + */ + public ResolverRoutingTable() { + super(); + _resourceMap = Maps.newHashMap(); + _stateSet = Sets.newHashSet(); + } + + /** + * Get all resources that are currently served in the cluster. + * @return set of resource names + */ + public synchronized Set getResources() { + return Sets.newHashSet(_resourceMap.keySet()); + } + + /** + * Get all partitions currently served for a resource. + * @param resource the resource for which to look up partitions + * @return set of partition names + */ + public synchronized Set getPartitions(String resource) { + if (_resourceMap.containsKey(resource)) { + return Sets.newHashSet(_resourceMap.get(resource)); + } else { + return Collections.emptySet(); + } + } + + /** + * Get all states that partitions of all resources are currently in + * @return set of state names + */ + public synchronized Set getStates() { + return Sets.newHashSet(_stateSet); + } + + @Override + public synchronized void onExternalViewChange(List externalViewList, + NotificationContext changeContext) { + super.onExternalViewChange(externalViewList, changeContext); + _resourceMap.clear(); + _stateSet.clear(); + for (ExternalView externalView : externalViewList) { + _resourceMap.put(externalView.getResourceName(), externalView.getPartitionSet()); + for (String partition : externalView.getPartitionSet()) { + _stateSet.addAll(externalView.getStateMap(partition).values()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/f2475fa9/helix-ipc/src/main/java/org/apache/helix/resolver/zk/ZKHelixResolver.java ---------------------------------------------------------------------- diff --git a/helix-ipc/src/main/java/org/apache/helix/resolver/zk/ZKHelixResolver.java b/helix-ipc/src/main/java/org/apache/helix/resolver/zk/ZKHelixResolver.java new file mode 100644 index 0000000..48fa81a --- /dev/null +++ b/helix-ipc/src/main/java/org/apache/helix/resolver/zk/ZKHelixResolver.java @@ -0,0 +1,45 @@ +package org.apache.helix.resolver.zk; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.resolver.AbstractHelixResolver; + +/** + * A ZooKeeper-specific {@link org.apache.helix.resolver.HelixResolver} + */ +public class ZKHelixResolver extends AbstractHelixResolver { + private final String _zkAddress; + + /** + * Create a ZK-based Helix resolver + * @param zkConnectString the connection string to the ZooKeeper ensemble + */ + public ZKHelixResolver(String zkConnectString) { + _zkAddress = zkConnectString; + } + + @Override + protected HelixManager createManager(String cluster) { + return HelixManagerFactory.getZKHelixManager(cluster, null, InstanceType.SPECTATOR, _zkAddress); + } +}