hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [1/2] hbase git commit: HBASE-12684 Add new AsyncRpcClient (Jurriaan Mous)
Date Sun, 25 Jan 2015 02:21:49 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 e180f0bdd -> eb9978cc3


http://git-wip-us.apache.org/repos/asf/hbase/blob/eb9978cc/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index f04ec1e..20f24ca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -44,6 +44,7 @@ import java.nio.channels.SocketChannel;
 import java.nio.channels.WritableByteChannel;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -1408,9 +1409,9 @@ public class RpcServer implements RpcServerInterface {
       int count;
       // Check for 'HBas' magic.
       this.dataLengthBuffer.flip();
-      if (!HConstants.RPC_HEADER.equals(dataLengthBuffer)) {
+      if (!Arrays.equals(HConstants.RPC_HEADER, dataLengthBuffer.array())) {
         return doBadPreambleHandling("Expected HEADER=" +
-            Bytes.toStringBinary(HConstants.RPC_HEADER.array()) +
+            Bytes.toStringBinary(HConstants.RPC_HEADER) +
             " but received HEADER=" + Bytes.toStringBinary(dataLengthBuffer.array()) +
             " from " + toString());
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/eb9978cc/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
index 2c70eb4..c246663 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
@@ -33,9 +33,18 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.net.SocketFactory;
 
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcChannel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.socket.SocketChannel;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -44,10 +53,13 @@ import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RowMutations;
@@ -90,7 +102,10 @@ import com.google.protobuf.ServiceException;
  */
 @Category(SmallTests.class)
 public class TestIPC {
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
   public static final Log LOG = LogFactory.getLog(TestIPC.class);
+
   static byte [] CELL_BYTES =  Bytes.toBytes("xyz");
   static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
   static byte [] BIG_CELL_BYTES = new byte [10 * 1024];
@@ -190,8 +205,8 @@ public class TestIPC {
       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
       final String message = "hello";
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
-      Pair<Message, CellScanner> r = client.call(null, md, param, null,
-        md.getOutputType().toProto(), User.getCurrent(), address, 0);
+      Pair<Message, CellScanner> r = client.call(null, md, param,
+        md.getOutputType().toProto(), User.getCurrent(), address);
       assertTrue(r.getSecond() == null);
       // Silly assertion that the message is in the returned pb.
       assertTrue(r.getFirst().toString().contains(message));
@@ -202,6 +217,44 @@ public class TestIPC {
   }
 
   /**
+   * Ensure we do not HAVE TO HAVE a codec.
+   *
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  @Test public void testNoCodecAsync() throws InterruptedException, IOException, ServiceException
{
+    Configuration conf = HBaseConfiguration.create();
+    AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null)
{
+      @Override Codec getCodec() {
+        return null;
+      }
+    };
+    TestRpcServer rpcServer = new TestRpcServer();
+    try {
+      rpcServer.start();
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
+      final String message = "hello";
+      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
+
+      BlockingRpcChannel channel = client
+          .createBlockingRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(),
+              System.currentTimeMillis()), User.getCurrent(), 0);
+
+      PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+      Message response =
+          channel.callBlockingMethod(md, controller, param, md.getOutputType().toProto());
+
+      assertTrue(controller.cellScanner() == null);
+      // Silly assertion that the message is in the returned pb.
+      assertTrue(response.toString().contains(message));
+    } finally {
+      client.close();
+      rpcServer.stop();
+    }
+  }
+
+  /**
    * It is hard to verify the compression is actually happening under the wraps.  Hope that
if
    * unsupported, we'll get an exception out of some time (meantime, have to trace it manually
    * to confirm that compression is happening down in the client and server).
@@ -212,13 +265,17 @@ public class TestIPC {
    */
   @Test
   public void testCompressCellBlock()
-  throws IOException, InterruptedException, SecurityException, NoSuchMethodException {
+      throws IOException, InterruptedException, SecurityException, NoSuchMethodException,
+      ServiceException {
     Configuration conf = new Configuration(HBaseConfiguration.create());
     conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
-    doSimpleTest(conf, new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT));
+    doSimpleTest(new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT));
+
+    // Another test for the async client
+    doAsyncSimpleTest(new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null));
   }
 
-  private void doSimpleTest(final Configuration conf, final RpcClientImpl client)
+  private void doSimpleTest(final RpcClientImpl client)
   throws InterruptedException, IOException {
     TestRpcServer rpcServer = new TestRpcServer();
     List<Cell> cells = new ArrayList<Cell>();
@@ -229,8 +286,11 @@ public class TestIPC {
       InetSocketAddress address = rpcServer.getListenerAddress();
       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-      Pair<Message, CellScanner> r = client.call(null, md, param, CellUtil.createCellScanner(cells),
-        md.getOutputType().toProto(), User.getCurrent(), address, 0);
+
+      PayloadCarryingRpcController pcrc =
+          new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
+      Pair<Message, CellScanner> r = client
+          .call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address);
       int index = 0;
       while (r.getSecond().advance()) {
         assertTrue(CELL.equals(r.getSecond().current()));
@@ -243,6 +303,42 @@ public class TestIPC {
     }
   }
 
+  private void doAsyncSimpleTest(final AsyncRpcClient client)
+      throws InterruptedException, IOException, ServiceException {
+    TestRpcServer rpcServer = new TestRpcServer();
+    List<Cell> cells = new ArrayList<Cell>();
+    int count = 3;
+    for (int i = 0; i < count; i++)
+      cells.add(CELL);
+    try {
+      rpcServer.start();
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
+      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+
+      PayloadCarryingRpcController pcrc =
+          new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
+
+      BlockingRpcChannel channel = client.createBlockingRpcChannel(
+          ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()),
+          User.getCurrent(), 0);
+
+      channel.callBlockingMethod(md, pcrc, param, md.getOutputType().toProto());
+
+      CellScanner cellScanner = pcrc.cellScanner();
+
+      int index = 0;
+      while (cellScanner.advance()) {
+        assertTrue(CELL.equals(cellScanner.current()));
+        index++;
+      }
+      assertEquals(count, index);
+    } finally {
+      client.close();
+      rpcServer.stop();
+    }
+  }
+
   @Test
   public void testRTEDuringConnectionSetup() throws Exception {
     Configuration conf = HBaseConfiguration.create();
@@ -263,7 +359,48 @@ public class TestIPC {
       InetSocketAddress address = rpcServer.getListenerAddress();
       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-      client.call(null, md, param, null, null, User.getCurrent(), address, 0);
+      client.call(null, md, param, null, User.getCurrent(), address);
+      fail("Expected an exception to have been thrown!");
+    } catch (Exception e) {
+      LOG.info("Caught expected exception: " + e.toString());
+      assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
+    } finally {
+      client.close();
+      rpcServer.stop();
+    }
+  }
+
+  @Test
+  public void testRTEDuringAsyncBlockingConnectionSetup() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+
+    TestRpcServer rpcServer = new TestRpcServer();
+    AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null,
+        new ChannelInitializer<SocketChannel>() {
+
+          @Override protected void initChannel(SocketChannel ch) throws Exception {
+            ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
+              @Override
+              public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
+                  throws Exception {
+                promise.setFailure(new RuntimeException("Injected fault"));
+              }
+            });
+          }
+        });
+    try {
+      rpcServer.start();
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
+      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+
+      BlockingRpcChannel channel = client.createBlockingRpcChannel(
+          ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()),
+          User.getCurrent(), 0);
+
+      channel.callBlockingMethod(md, new PayloadCarryingRpcController(), param,
+          md.getOutputType().toProto());
+
       fail("Expected an exception to have been thrown!");
     } catch (Exception e) {
       LOG.info("Caught expected exception: " + e.toString());
@@ -274,6 +411,106 @@ public class TestIPC {
     }
   }
 
+
+  @Test
+  public void testRTEDuringAsyncConnectionSetup() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+
+    TestRpcServer rpcServer = new TestRpcServer();
+    AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null,
+        new ChannelInitializer<SocketChannel>() {
+
+          @Override protected void initChannel(SocketChannel ch) throws Exception {
+            ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
+              @Override
+              public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
+                  throws Exception {
+                promise.setFailure(new RuntimeException("Injected fault"));
+              }
+            });
+          }
+        });
+    try {
+      rpcServer.start();
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
+      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+
+      RpcChannel channel = client.createRpcChannel(
+          ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()),
+          User.getCurrent(), 0);
+
+      final AtomicBoolean done = new AtomicBoolean(false);
+
+      PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+      controller.notifyOnFail(new RpcCallback<IOException>() {
+        @Override
+        public void run(IOException e) {
+          done.set(true);
+          LOG.info("Caught expected exception: " + e.toString());
+          assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
+        }
+      });
+
+      channel.callMethod(md, controller, param,
+          md.getOutputType().toProto(), new RpcCallback<Message>() {
+            @Override
+            public void run(Message parameter) {
+              done.set(true);
+              fail("Expected an exception to have been thrown!");
+            }
+          });
+
+      TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return done.get();
+        }
+      });
+    } finally {
+      client.close();
+      rpcServer.stop();
+    }
+  }
+
+  @Test
+  public void testAsyncConnectionSetup() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+
+    TestRpcServer rpcServer = new TestRpcServer();
+    AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null);
+    try {
+      rpcServer.start();
+      InetSocketAddress address = rpcServer.getListenerAddress();
+      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
+      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+
+      RpcChannel channel = client.createRpcChannel(
+          ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()),
+          User.getCurrent(), 0);
+
+      final AtomicBoolean done = new AtomicBoolean(false);
+
+      channel.callMethod(md, new PayloadCarryingRpcController(), param,
+          md.getOutputType().toProto(), new RpcCallback<Message>() {
+            @Override
+            public void run(Message parameter) {
+              done.set(true);
+            }
+          });
+
+      TEST_UTIL.waitFor(1000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return done.get();
+        }
+      });
+    } finally {
+      client.close();
+      rpcServer.stop();
+    }
+  }
+
   /** Tests that the rpc scheduler is called when requests arrive. */
   @Test
   public void testRpcScheduler() throws IOException, InterruptedException {
@@ -287,8 +524,43 @@ public class TestIPC {
       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
       for (int i = 0; i < 10; i++) {
-        client.call(null, md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)),
-            md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(),
0);
+        client.call(
+            new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL))),
+            md, param, md.getOutputType().toProto(), User.getCurrent(),
+            rpcServer.getListenerAddress());
+      }
+      verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
+    } finally {
+      rpcServer.stop();
+      verify(scheduler).stop();
+    }
+  }
+
+  /**
+   * Tests that the rpc scheduler is called when requests arrive.
+   */
+  @Test
+  public void testRpcSchedulerAsync()
+      throws IOException, InterruptedException, ServiceException {
+    RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
+    RpcServer rpcServer = new TestRpcServer(scheduler);
+    verify(scheduler).init((RpcScheduler.Context) anyObject());
+    AbstractRpcClient client = new AsyncRpcClient(CONF, HConstants.CLUSTER_ID_DEFAULT, null);
+    try {
+      rpcServer.start();
+      verify(scheduler).start();
+      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
+      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+      ServerName serverName = ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
+          rpcServer.getListenerAddress().getPort(), System.currentTimeMillis());
+
+      for (int i = 0; i < 10; i++) {
+        BlockingRpcChannel channel = client.createBlockingRpcChannel(
+            serverName, User.getCurrent(), 0);
+
+        channel.callBlockingMethod(md,
+            new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL))),
+            param, md.getOutputType().toProto());
       }
       verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
     } finally {
@@ -340,9 +612,10 @@ public class TestIPC {
           // ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
           //  "Thread dump " + Thread.currentThread().getName());
         }
-        CellScanner cellScanner = CellUtil.createCellScanner(cells);
+        PayloadCarryingRpcController pcrc =
+            new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
         Pair<Message, CellScanner> response =
-            client.call(null, md, builder.build(), cellScanner, param, user, address, 0);
+            client.call(pcrc, md, builder.build(), param, user, address);
         /*
         int count = 0;
         while (p.getSecond().advance()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/eb9978cc/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
index 9cb1cc5..2c21ebd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
@@ -17,13 +17,13 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import static org.mockito.Mockito.mock;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -48,13 +48,12 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.protobuf.BlockingService;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
 
 @Category({SmallTests.class})
 public class TestRpcHandlerException {
@@ -177,8 +176,11 @@ public class TestRpcHandlerException {
       rpcServer.start();
       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-      client.call(null, md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)), md
-        .getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), 0);
+      PayloadCarryingRpcController controller =
+          new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL)));
+      
+      client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(),
+          rpcServer.getListenerAddress());
     } catch (Throwable e) {
       assert(abortable.isAborted() == true);
     } finally {


Mime
View raw message