hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-19394 Support multi-homing env for the publication of RS status with multicast (hbase.status.published) (Toshihiro Suzuki)
Date Tue, 12 Dec 2017 15:48:51 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-2 46d9b4cf0 -> 0e47ded1a


HBASE-19394 Support multi-homing env for the publication of RS status with multicast (hbase.status.published)
(Toshihiro Suzuki)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0e47ded1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0e47ded1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0e47ded1

Branch: refs/heads/branch-2
Commit: 0e47ded1a55958b2e2088433df025864c38f562e
Parents: 46d9b4c
Author: tedyu <yuzhihong@gmail.com>
Authored: Tue Dec 12 07:48:40 2017 -0800
Committer: tedyu <yuzhihong@gmail.com>
Committed: Tue Dec 12 07:48:40 2017 -0800

----------------------------------------------------------------------
 .../hbase/client/ClusterStatusListener.java     |  37 ++++---
 .../org/apache/hadoop/hbase/HConstants.java     |  16 ++-
 .../hbase/master/ClusterStatusPublisher.java    | 100 +++++++++++--------
 3 files changed, 93 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0e47ded1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
index 12e9a60..7f20436 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
@@ -20,16 +20,6 @@
 package org.apache.hadoop.hbase.client;
 
 
-import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap;
-import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufInputStream;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramChannel;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramPacket;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioDatagramChannel;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -48,13 +38,23 @@ import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap;
+import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufInputStream;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramChannel;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramPacket;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioDatagramChannel;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
 
 /**
  * A class that receives the cluster status, and provide it as a set of service to the client.
@@ -104,7 +104,7 @@ class ClusterStatusListener implements Closeable {
      * Called to connect.
      *
      * @param conf Configuration to use.
-     * @throws IOException
+     * @throws IOException if failing to connect
      */
     void connect(Configuration conf) throws IOException;
   }
@@ -197,6 +197,7 @@ class ClusterStatusListener implements Closeable {
         HConstants.DEFAULT_STATUS_MULTICAST_BIND_ADDRESS);
       int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
           HConstants.DEFAULT_STATUS_MULTICAST_PORT);
+      String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME);
 
       InetAddress ina;
       try {
@@ -219,7 +220,13 @@ class ClusterStatusListener implements Closeable {
         throw ExceptionUtil.asInterrupt(e);
       }
 
-      NetworkInterface ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
+      NetworkInterface ni;
+      if (niName != null) {
+        ni = NetworkInterface.getByName(niName);
+      } else {
+        ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
+      }
+
       channel.joinGroup(ina, ni, null, channel.newPromise());
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e47ded1/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 594a895..30e8545 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -28,8 +28,8 @@ import java.util.UUID;
 import java.util.regex.Pattern;
 
 import org.apache.commons.lang3.ArrayUtils;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * HConstants holds a bunch of HBase-related constants
@@ -586,7 +586,7 @@ public final class HConstants {
    * Special! Used in fake Cells only. Should never be the timestamp on an actual Cell returned
to
    * a client.
    * @deprecated Should not be public since hbase-1.3.0. For internal use only. Move internal
to
-   * Scanners flagged as special timestamp value never to be returned as timestamp on a Cell.
+   *   Scanners flagged as special timestamp value never to be returned as timestamp on a
Cell.
    */
   @Deprecated
   public static final long OLDEST_TIMESTAMP = Long.MIN_VALUE;
@@ -1187,6 +1187,18 @@ public final class HConstants {
   public static final String STATUS_MULTICAST_PORT = "hbase.status.multicast.address.port";
   public static final int DEFAULT_STATUS_MULTICAST_PORT = 16100;
 
+  /**
+   * The network interface name to use for the multicast messages.
+   */
+  public static final String STATUS_MULTICAST_NI_NAME = "hbase.status.multicast.ni.name";
+
+  /**
+   * The address to use for binding the local socket for sending multicast. Defaults to 0.0.0.0.
+   */
+  public static final String STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS =
+    "hbase.status.multicast.publisher.bind.address.ip";
+  public static final String DEFAULT_STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS = "0.0.0.0";
+
   public static final long NO_NONCE = 0;
 
   /** Default cipher for encryption */

http://git-wip-us.apache.org/repos/asf/hbase/blob/0e47ded1/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
index 63cc96e..cbf4b1c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
@@ -21,22 +21,6 @@
 package org.apache.hadoop.hbase.master;
 
 
-import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap;
-import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.ChannelFactory;
-import org.apache.hadoop.hbase.shaded.io.netty.buffer.Unpooled;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelException;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramChannel;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramPacket;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.InternetProtocolFamily;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioDatagramChannel;
-import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.MessageToMessageEncoder;
-import org.apache.hadoop.hbase.shaded.io.netty.util.internal.StringUtil;
-
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.Inet6Address;
@@ -58,9 +42,6 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ScheduledChore;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
@@ -68,6 +49,25 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap;
+import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.ChannelFactory;
+import org.apache.hadoop.hbase.shaded.io.netty.buffer.Unpooled;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelException;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramChannel;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramPacket;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.InternetProtocolFamily;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioDatagramChannel;
+import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.MessageToMessageEncoder;
+import org.apache.hadoop.hbase.shaded.io.netty.util.internal.StringUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
 
 
 /**
@@ -252,6 +252,9 @@ public class ClusterStatusPublisher extends ScheduledChore {
           HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
       int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT,
           HConstants.DEFAULT_STATUS_MULTICAST_PORT);
+      String bindAddress = conf.get(HConstants.STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS,
+        HConstants.DEFAULT_STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS);
+      String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME);
 
       final InetAddress ina;
       try {
@@ -264,24 +267,34 @@ public class ClusterStatusPublisher extends ScheduledChore {
       final InetSocketAddress isa = new InetSocketAddress(mcAddress, port);
 
       InternetProtocolFamily family;
-      InetAddress localAddress;
-      if (ina instanceof Inet6Address) {
-        localAddress = Addressing.getIp6Address();
-        family = InternetProtocolFamily.IPv6;
-      }else{
-        localAddress = Addressing.getIp4Address();
-        family = InternetProtocolFamily.IPv4;
+      NetworkInterface ni;
+      if (niName != null) {
+        if (ina instanceof Inet6Address) {
+          family = InternetProtocolFamily.IPv6;
+        } else {
+          family = InternetProtocolFamily.IPv4;
+        }
+        ni = NetworkInterface.getByName(niName);
+      } else {
+        InetAddress localAddress;
+        if (ina instanceof Inet6Address) {
+          localAddress = Addressing.getIp6Address();
+          family = InternetProtocolFamily.IPv6;
+        } else {
+          localAddress = Addressing.getIp4Address();
+          family = InternetProtocolFamily.IPv4;
+        }
+        ni = NetworkInterface.getByInetAddress(localAddress);
       }
-      NetworkInterface ni = NetworkInterface.getByInetAddress(localAddress);
 
       Bootstrap b = new Bootstrap();
       b.group(group)
-      .channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class,
family))
-      .option(ChannelOption.SO_REUSEADDR, true)
-      .handler(new ClusterStatusEncoder(isa));
+        .channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class,
family))
+        .option(ChannelOption.SO_REUSEADDR, true)
+        .handler(new ClusterStatusEncoder(isa));
 
       try {
-        channel = (DatagramChannel) b.bind(new InetSocketAddress(0)).sync().channel();
+        channel = (DatagramChannel) b.bind(bindAddress, 0).sync().channel();
         channel.joinGroup(ina, ni, null, channel.newPromise()).sync();
         channel.connect(isa).sync();
       } catch (InterruptedException e) {
@@ -290,33 +303,34 @@ public class ClusterStatusPublisher extends ScheduledChore {
       }
     }
 
-    private static final class HBaseDatagramChannelFactory<T extends Channel> implements
ChannelFactory<T> {
+    private static final class HBaseDatagramChannelFactory<T extends Channel>
+      implements ChannelFactory<T> {
       private final Class<? extends T> clazz;
       private InternetProtocolFamily family;
 
       HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily
family) {
-          this.clazz = clazz;
-          this.family = family;
+        this.clazz = clazz;
+        this.family = family;
       }
 
       @Override
       public T newChannel() {
-          try {
-            return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(),
-              new Class[] { InternetProtocolFamily.class }, new Object[] { family });
+        try {
+          return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(),
+            new Class[] { InternetProtocolFamily.class }, new Object[] { family });
 
-          } catch (Throwable t) {
-              throw new ChannelException("Unable to create Channel from class " + clazz,
t);
-          }
+        } catch (Throwable t) {
+          throw new ChannelException("Unable to create Channel from class " + clazz, t);
+        }
       }
 
       @Override
       public String toString() {
-          return StringUtil.simpleClassName(clazz) + ".class";
+        return StringUtil.simpleClassName(clazz) + ".class";
       }
-  }
+    }
 
-    private static class ClusterStatusEncoder extends MessageToMessageEncoder<ClusterStatus>
{
+    private static final class ClusterStatusEncoder extends MessageToMessageEncoder<ClusterStatus>
{
       final private InetSocketAddress isa;
 
       private ClusterStatusEncoder(InetSocketAddress isa) {


Mime
View raw message