hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject hbase git commit: HBASE-12359 MulticastPublisher should specify IPv4/v6 protocol family when creating multicast channel (Qiang Tian)
Date Sat, 15 Nov 2014 05:56:11 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 30ecf5ae7 -> dcba04552


HBASE-12359 MulticastPublisher should specify IPv4/v6 protocol family when creating multicast
channel (Qiang Tian)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/dcba0455
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/dcba0455
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/dcba0455

Branch: refs/heads/branch-1
Commit: dcba04552688d3a66b1f1dd9df703525c59027da
Parents: 30ecf5a
Author: stack <stack@apache.org>
Authored: Fri Nov 14 21:55:15 2014 -0800
Committer: stack <stack@apache.org>
Committed: Fri Nov 14 21:55:59 2014 -0800

----------------------------------------------------------------------
 .../hbase/master/ClusterStatusPublisher.java    | 46 ++++++++++++++++++--
 1 file changed, 43 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/dcba0455/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
index 7a7a58c..c60f9d4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
@@ -22,16 +22,21 @@ package org.apache.hadoop.hbase.master;
 
 
 import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ChannelFactory;
 import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelException;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.DatagramChannel;
 import io.netty.channel.socket.DatagramPacket;
+import io.netty.channel.socket.InternetProtocolFamily;
 import io.netty.channel.socket.nio.NioDatagramChannel;
 import io.netty.handler.codec.MessageToMessageEncoder;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import io.netty.util.internal.StringUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Chore;
 import org.apache.hadoop.hbase.ClusterStatus;
@@ -43,11 +48,14 @@ import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.VersionInfo;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.Inet6Address;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.NetworkInterface;
@@ -60,6 +68,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+
 /**
  * Class to publish the cluster status to the client. This allows them to know immediately
  *  the dead region servers, hence to cut the connection they have with them, eventually
stop
@@ -261,12 +270,17 @@ public class ClusterStatusPublisher extends Chore {
       }
 
       final InetSocketAddress isa = new InetSocketAddress(mcAddress, port);
+      InternetProtocolFamily family = InternetProtocolFamily.IPv4;
+      if (ina instanceof Inet6Address) {
+        family = InternetProtocolFamily.IPv6;
+      }
 
       Bootstrap b = new Bootstrap();
+
       b.group(group)
-          .channel(NioDatagramChannel.class)
-          .option(ChannelOption.SO_REUSEADDR, true)
-          .handler(new ClusterStatusEncoder(isa));
+      .channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class,
family))
+      .option(ChannelOption.SO_REUSEADDR, true)
+      .handler(new ClusterStatusEncoder(isa));
 
       try {
         channel = (DatagramChannel) b.bind(new InetSocketAddress(0)).sync().channel();
@@ -278,6 +292,32 @@ public class ClusterStatusPublisher extends Chore {
       }
     }
 
+    private static final class HBaseDatagramChannelFactory<T extends Channel> implements
ChannelFactory<T> {
+      private final Class<? extends T> clazz;
+      private InternetProtocolFamily family;
+
+      HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily
family) {
+          this.clazz = clazz;
+          this.family = family;
+      }
+
+      @Override
+      public T newChannel() {
+          try {
+            return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(),
+              new Class[] { InternetProtocolFamily.class }, new Object[] { family });
+
+          } catch (Throwable t) {
+              throw new ChannelException("Unable to create Channel from class " + clazz,
t);
+          }
+      }
+
+      @Override
+      public String toString() {
+          return StringUtil.simpleClassName(clazz) + ".class";
+      }
+  }
+
     private static class ClusterStatusEncoder extends MessageToMessageEncoder<ClusterStatus>
{
       final private InetSocketAddress isa;
 


Mime
View raw message