hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1310606 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/ipc/ test/java/org/apache/hadoop/hbase/ipc/
Date Fri, 06 Apr 2012 22:10:46 GMT
Author: mbautin
Date: Fri Apr  6 22:10:45 2012
New Revision: 1310606

URL: http://svn.apache.org/viewvc?rev=1310606&view=rev
Log:
[HBASE-5355] [89-fb] Compressed RPC's for HBase

Summary:
Needed by a lot of folks who want to do large batch inserts and large remote
cluster reads where network is the bottleneck. This is done at the RPC layer.

Does the following:

- Configuration to control of RPC's are compressed (set "hbase.rpc.compression"
to compression type, eg - "gz")
- Version is bumped from 3 to 4, but is fully backward compatible. If
compression is set to none (default), the the older version number is used.
- Adds a compression type field to each Call object on both the client and
server side

Test Plan:
Wrote a unit test and ran all the unit tests.

Reviewers: kannan, todd, stack, pkhemani

Reviewed By: kannan

CC: nzhang, nspiegelberg, mbautin, Liyin, gqchen, dhruba, khemani, Karthik,
Kannan

Differential Revision: https://reviews.facebook.net/D1671

Added:
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/ipc/
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/ipc/TestRPCCompression.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1310606&r1=1310605&r2=1310606&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Fri Apr 
6 22:10:45 2012
@@ -19,6 +19,7 @@
  */
 package org.apache.hadoop.hbase;
 
+import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -440,6 +441,13 @@ public final class HConstants {
    */
   public static int DEFAULT_HBASE_RPC_TIMEOUT = 60000;
 
+  /**
+   * compression for each RPC and its default value
+   */
+  public static String HBASE_RPC_COMPRESSION_KEY = "hbase.rpc.compression";
+  public static Compression.Algorithm DEFAULT_HBASE_RPC_COMPRESSION =
+    Compression.Algorithm.NONE;
+
   public static final String
       REPLICATION_ENABLE_KEY = "hbase.replication";
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1310606&r1=1310605&r2=1310606&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Fri
Apr  6 22:10:45 2012
@@ -22,11 +22,13 @@ package org.apache.hadoop.hbase.ipc;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -43,11 +45,14 @@ import javax.net.SocketFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -143,6 +148,9 @@ public class HBaseClient {
     Writable value;                               // value, null if error
     IOException error;                            // exception, null if value
     boolean done;                                 // true when call is done
+    protected Compression.Algorithm compressionAlgo =
+      Compression.Algorithm.NONE;
+    protected int version = HBaseServer.CURRENT_VERSION;
 
     protected Call(Writable param) {
       this.param = param;
@@ -177,6 +185,22 @@ public class HBaseClient {
       this.value = value;
       callComplete();
     }
+
+    public void setVersion(int version) {
+      this.version = version;
+    }
+
+    public int getVersion() {
+      return version;
+    }
+
+    public void setRPCCompression(Compression.Algorithm compressionAlgo) {
+      this.compressionAlgo = compressionAlgo;
+    }
+
+    public Compression.Algorithm getRPCCompression() {
+      return this.compressionAlgo;
+    }
   }
 
   /** Thread that reads responses and notifies callers.  Each connection owns a
@@ -194,10 +218,6 @@ public class HBaseClient {
     protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();  // indicate
if the connection is closed
     private IOException closeException; // close reason
 
-    public Connection(InetSocketAddress address) throws IOException {
-      this(new ConnectionId(address, null, 0));
-    }
-
     public Connection(ConnectionId remoteId) throws IOException {
       if (remoteId.getAddress().isUnresolved()) {
         throw new UnknownHostException("unknown host: " +
@@ -292,7 +312,7 @@ public class HBaseClient {
      * the connection thread that waits for responses.
      * @throws java.io.IOException e
      */
-    protected synchronized void setupIOstreams() throws IOException {
+    protected synchronized void setupIOstreams(byte version) throws IOException {
       if (socket != null || shouldCloseConnection.get()) {
         return;
       }
@@ -325,7 +345,7 @@ public class HBaseClient {
             (new PingInputStream(NetUtils.getInputStream(socket))));
         this.out = new DataOutputStream
             (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
-        writeHeader();
+        writeHeader(version);
 
         // update last activity time
         touch();
@@ -386,9 +406,9 @@ public class HBaseClient {
     /* Write the header for each connection
      * Out is not synchronized because only the first thread does this.
      */
-    private void writeHeader() throws IOException {
+    private void writeHeader(byte version) throws IOException {
       out.write(HBaseServer.HEADER.array());
-      out.write(HBaseServer.CURRENT_VERSION);
+      out.write(version);
       //When there are more fields we can have ConnectionHeader Writable.
       DataOutputBuffer buf = new DataOutputBuffer();
       ObjectWritable.writeObject(buf, remoteId.getTicket(),
@@ -480,20 +500,43 @@ public class HBaseClient {
         return;
       }
 
-      DataOutputBuffer d=null;
+      DataOutputStream uncompressedOS = null;
+      DataOutputStream outOS = null;
       try {
         //noinspection SynchronizeOnNonFinalField
         synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
           if (LOG.isDebugEnabled())
             LOG.debug(getName() + " sending #" + call.id);
 
-          //for serializing the
-          //data to be written
-          d = new DataOutputBuffer();
-          d.writeInt(call.id);
-          call.param.write(d);
-          byte[] data = d.getData();
-          int dataLength = d.getLength();
+          ByteArrayOutputStream baos = new ByteArrayOutputStream();
+          uncompressedOS = new DataOutputStream(baos);
+          outOS = uncompressedOS;
+
+          // 1. write the call id uncompressed
+          uncompressedOS.writeInt(call.id);
+
+          // preserve backwards compatibility
+          if (call.getRPCCompression() != Compression.Algorithm.NONE) {
+            // 2. write the compression algo used to compress the request being sent
+            uncompressedOS.writeUTF(call.getRPCCompression().getName());
+
+            // 3. write the compression algo to use for the response
+            uncompressedOS.writeUTF(call.getRPCCompression().getName());
+
+            // 4. setup the compressor
+            Compressor compressor = call.getRPCCompression().getCompressor();
+            OutputStream compressedOutputStream =
+              call.getRPCCompression().createCompressionStream(
+                uncompressedOS, compressor, 0);
+            outOS = new DataOutputStream(compressedOutputStream);
+          }
+
+          // 5. write the output params with the correct compression type
+          call.param.write(outOS);
+          outOS.flush();
+          baos.flush();
+          byte[] data = baos.toByteArray();
+          int dataLength = data.length;
           out.writeInt(dataLength);      //first put the data length
           out.write(data, 0, dataLength);//write the data
           out.flush();
@@ -503,7 +546,10 @@ public class HBaseClient {
       } finally {
         //the buffer is just an in-memory buffer, but it is still polite to
         // close early
-        IOUtils.closeStream(d);
+        if (outOS != uncompressedOS) {
+          IOUtils.closeStream(outOS);
+        }
+        IOUtils.closeStream(uncompressedOS);
       }
     }
 
@@ -517,22 +563,41 @@ public class HBaseClient {
       touch();
 
       try {
-        int id = in.readInt();                    // try to read an id
+        DataInputStream localIn = in;
 
+        // 1. Read the call id uncompressed which is an int
+        int id = localIn.readInt();
         if (LOG.isDebugEnabled())
           LOG.debug(getName() + " got value #" + id);
-
         Call call = calls.get(id);
 
-        boolean isError = in.readBoolean();     // read if error
+        // 2. read the error boolean uncompressed
+        boolean isError = localIn.readBoolean();
+
+        if (call.getVersion() >= HBaseServer.VERSION_COMPRESSED_RPC) {
+          // 3. read the compression type used for the rest of the response
+          String compressionAlgoName = localIn.readUTF();
+          Compression.Algorithm rpcCompression =
+            Compression.getCompressionAlgorithmByName(compressionAlgoName);
+
+          // 4. setup the correct decompressor (if any)
+          if (rpcCompression != Compression.Algorithm.NONE) {
+            Decompressor decompressor = rpcCompression.getDecompressor();
+            InputStream is = rpcCompression.createDecompressionStream(
+                  in, decompressor, 0);
+            localIn = new DataInputStream(is);
+          }
+        }
+
+        // 5. read the rest of the value
         if (isError) {
           //noinspection ThrowableInstanceNeverThrown
-          call.setException(new RemoteException( WritableUtils.readString(in),
-              WritableUtils.readString(in)));
+          call.setException(new RemoteException( WritableUtils.readString(localIn),
+              WritableUtils.readString(localIn)));
           calls.remove(id);
         } else {
           Writable value = ReflectionUtils.newInstance(valueClass, conf);
-          value.readFields(in);                 // read value
+          value.readFields(localIn);                 // read value
           call.setValue(value);
           calls.remove(id);
         }
@@ -725,13 +790,15 @@ public class HBaseClient {
    */
   public Writable call(Writable param, InetSocketAddress address)
   throws IOException {
-      return call(param, address, null, 0);
+      return call(param, address, null, 0, Compression.Algorithm.NONE);
   }
 
   public Writable call(Writable param, InetSocketAddress addr,
-                       UserGroupInformation ticket, int rpcTimeout)
+                       UserGroupInformation ticket, int rpcTimeout,
+                       Compression.Algorithm rpcCompression)
                        throws IOException {
     Call call = new Call(param);
+    call.setRPCCompression(rpcCompression);
     Connection connection = getConnection(addr, ticket, rpcTimeout, call);
     connection.sendParam(call);                 // send the parameter
     boolean interrupted = false;
@@ -804,7 +871,8 @@ public class HBaseClient {
    * @return  Writable[]
    * @throws IOException e
    */
-  public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
+  public Writable[] call(Writable[] params, InetSocketAddress[] addresses,
+      Compression.Algorithm rpcCompression)
     throws IOException {
     if (addresses.length == 0) return new Writable[0];
 
@@ -814,6 +882,7 @@ public class HBaseClient {
     synchronized (results) {
       for (int i = 0; i < params.length; i++) {
         ParallelCall call = new ParallelCall(params[i], results, i);
+        call.setRPCCompression(rpcCompression);
         try {
           Connection connection = getConnection(addresses[i], null, 0, call);
           connection.sendParam(call);             // send each parameter
@@ -845,12 +914,20 @@ public class HBaseClient {
       // the client is stopped
       throw new IOException("The client is stopped");
     }
+    // RPC compression is only supported from version 4, so make backward compatible
+    byte version = HBaseServer.CURRENT_VERSION;
+    if (call.getRPCCompression() == Compression.Algorithm.NONE) {
+      version = HBaseServer.VERSION_3;
+    }
+    call.setVersion(version);
+
     Connection connection;
     /* we could avoid this allocation for each RPC by having a
      * connectionsId object and with set() method. We need to manage the
      * refs for keys in HashMap properly. For now its ok.
      */
-    ConnectionId remoteId = new ConnectionId(addr, ticket, rpcTimeout);
+    ConnectionId remoteId = new ConnectionId(addr, ticket, rpcTimeout,
+        call.getVersion());
     do {
       synchronized (connections) {
         connection = connections.get(remoteId);
@@ -865,24 +942,27 @@ public class HBaseClient {
     //block above. The reason for that is if the server happens to be slow,
     //it will take longer to establish a connection and that will slow the
     //entire system down.
-    connection.setupIOstreams();
+    connection.setupIOstreams(version);
     return connection;
   }
 
   /**
    * This class holds the address and the user ticket. The client connections
-   * to servers are uniquely identified by <remoteAddress, ticket>
+   * to servers are uniquely identified by
+   * <remoteAddress, ticket, RPC version>
    */
   private static class ConnectionId {
     final InetSocketAddress address;
     final UserGroupInformation ticket;
     final private int rpcTimeout;
+    final private int version;
 
     ConnectionId(InetSocketAddress address, UserGroupInformation ticket,
-        int rpcTimeout) {
+        int rpcTimeout, int version) {
       this.address = address;
       this.ticket = ticket;
       this.rpcTimeout = rpcTimeout;
+      this.version = version;
     }
 
     InetSocketAddress getAddress() {
@@ -897,7 +977,7 @@ public class HBaseClient {
      if (obj instanceof ConnectionId) {
        ConnectionId id = (ConnectionId) obj;
        return address.equals(id.address) && ticket == id.ticket &&
-       rpcTimeout == id.rpcTimeout;
+       rpcTimeout == id.rpcTimeout && version == id.version;
        //Note : ticket is a ref comparision.
      }
      return false;
@@ -905,7 +985,8 @@ public class HBaseClient {
 
     @Override
     public int hashCode() {
-      return address.hashCode() ^ System.identityHashCode(ticket) ^ rpcTimeout;
+      return address.hashCode() ^ System.identityHashCode(ticket) ^
+          rpcTimeout ^ version;
     }
   }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1310606&r1=1310605&r2=1310606&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Fri Apr
 6 22:10:45 2012
@@ -27,9 +27,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.client.Operation;
 import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.VersionedProtocol;
@@ -237,6 +239,8 @@ public class HBaseRPC {
     private HBaseClient client;
     private boolean isClosed = false;
     final private int rpcTimeout;
+    private Compression.Algorithm rpcCompression =
+      HConstants.DEFAULT_HBASE_RPC_COMPRESSION;
 
     /**
      * @param address address for invoker
@@ -250,6 +254,11 @@ public class HBaseRPC {
       this.ticket = ticket;
       this.client = CLIENTS.getClient(conf, factory);
       this.rpcTimeout = rpcTimeout;
+      String compressionAlgo = conf.get(HConstants.HBASE_RPC_COMPRESSION_KEY);
+      if (compressionAlgo != null) {
+        rpcCompression =
+          Compression.getCompressionAlgorithmByName(compressionAlgo);
+      }
     }
 
     public Object invoke(Object proxy, Method method, Object[] args)
@@ -260,7 +269,8 @@ public class HBaseRPC {
         startTime = System.currentTimeMillis();
       }
       HbaseObjectWritable value = (HbaseObjectWritable)
-        client.call(new Invocation(method, args), address, ticket, rpcTimeout);
+        client.call(new Invocation(method, args), address, ticket,
+            rpcTimeout, rpcCompression);
       if (logDebug) {
         long callTime = System.currentTimeMillis() - startTime;
         LOG.debug("Call: " + method.getName() + " " + callTime);
@@ -477,8 +487,14 @@ public class HBaseRPC {
     for (int i = 0; i < params.length; i++)
       invocations[i] = new Invocation(method, params[i]);
     HBaseClient client = CLIENTS.getClient(conf);
+    Compression.Algorithm rpcCompression = Compression.Algorithm.NONE;
+    String compressionAlgo = conf.get(HConstants.HBASE_RPC_COMPRESSION_KEY);
+    if (compressionAlgo != null) {
+      rpcCompression =
+          Compression.getCompressionAlgorithmByName(compressionAlgo);
+    }
     try {
-    Writable[] wrappedValues = client.call(invocations, addrs);
+    Writable[] wrappedValues = client.call(invocations, addrs, rpcCompression);
 
     if (method.getReturnType() == Void.TYPE) {
       return null;

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1310606&r1=1310605&r2=1310606&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Fri
Apr  6 22:10:45 2012
@@ -20,25 +20,12 @@
 
 package org.apache.hadoop.hbase.ipc;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.WritableWithSize;
-import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.ObjectWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
-
 import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.BindException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -57,13 +44,33 @@ import java.nio.channels.SocketChannel;
 import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.WritableWithSize;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
  * a port and is defined by a parameter class and a value class.
@@ -82,7 +89,11 @@ public abstract class HBaseServer {
 
   // 1 : Introduce ping and server does not throw away RPCs
   // 3 : RPC was refactored in 0.19
-  public static final byte CURRENT_VERSION = 3;
+  public static final byte VERSION_3 = 3;
+  // 4 : includes support for compression on RPCs
+  public static final byte VERSION_COMPRESSED_RPC = 4;
+
+  public static final byte CURRENT_VERSION = VERSION_COMPRESSED_RPC;
 
   /**
    * How many calls/handler are allowed in the queue.
@@ -207,6 +218,9 @@ public abstract class HBaseServer {
     protected long timestamp;      // the time received when response is null
                                    // the time served when response is not null
     protected ByteBuffer response;                // the response for this call
+    protected Compression.Algorithm compressionAlgo =
+      Compression.Algorithm.NONE;
+    protected int version = CURRENT_VERSION;     // version used for the call
 
     public Call(int id, Writable param, Connection connection) {
       this.id = id;
@@ -216,6 +230,22 @@ public abstract class HBaseServer {
       this.response = null;
     }
 
+    public void setVersion(int version) {
+     this.version = version;
+    }
+
+    public int getVersion() {
+      return version;
+    }
+
+    public void setRPCCompression(Compression.Algorithm compressionAlgo) {
+      this.compressionAlgo = compressionAlgo;
+    }
+
+    public Compression.Algorithm getRPCCompression() {
+      return this.compressionAlgo;
+    }
+
     @Override
     public String toString() {
       return param.toString() + " from " + connection.toString();
@@ -711,6 +741,7 @@ public abstract class HBaseServer {
   private class Connection {
     private boolean versionRead = false; //if initial signature and
                                          //version are read
+    private int version = -1;
     private boolean headerRead = false;  //if the connection header that
                                          //follows version is read.
     protected SocketChannel channel;
@@ -810,15 +841,18 @@ public abstract class HBaseServer {
           if (count <= 0) {
             return count;
           }
-          int version = versionBuffer.get(0);
+          version = versionBuffer.get(0);
 
           dataLengthBuffer.flip();
-          if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
+          if (!HEADER.equals(dataLengthBuffer) ||
+              version < VERSION_3 || version > CURRENT_VERSION) {
             //Warning is ok since this is not supposed to happen.
             LOG.warn("Incorrect header or version mismatch from " +
                      hostAddress + ":" + remotePort +
-                     " got version " + version +
-                     " expected version " + CURRENT_VERSION);
+                     " got header " + dataLengthBuffer +
+                     ", version " + version +
+                     " supported versions [" + VERSION_3 +
+                     " ... " + CURRENT_VERSION + "]");
             return -1;
           }
           dataLengthBuffer.clear();
@@ -868,17 +902,45 @@ public abstract class HBaseServer {
     }
 
     private void processData() throws  IOException, InterruptedException {
-      DataInputStream dis =
+      DataInputStream uncompressedIs =
         new DataInputStream(new ByteArrayInputStream(data.array()));
-      int id = dis.readInt();                    // try to read an id
+      Compression.Algorithm rxCompression = Algorithm.NONE;
+      Compression.Algorithm txCompression = Algorithm.NONE;
+      DataInputStream dis = uncompressedIs;
 
+      // 1. read the call id uncompressed
+      int id = uncompressedIs.readInt();
       if (LOG.isDebugEnabled())
         LOG.debug(" got #" + id);
 
-      Writable param = ReflectionUtils.newInstance(paramClass, conf);           // read param
+      if (version >= VERSION_COMPRESSED_RPC) {
+
+        // 2. read the compression used for the request
+        String rxCompressionAlgoName = uncompressedIs.readUTF();
+        rxCompression =
+          Compression.getCompressionAlgorithmByName(rxCompressionAlgoName);
+
+        // 3. read the compression requested for the response
+        String txCompressionAlgoName = uncompressedIs.readUTF();
+        txCompression =
+          Compression.getCompressionAlgorithmByName(txCompressionAlgoName);
+
+        // 4. set up a decompressor to read the rest of the request
+        if (rxCompression != Compression.Algorithm.NONE) {
+          Decompressor decompressor = rxCompression.getDecompressor();
+          InputStream is = rxCompression.createDecompressionStream(
+              uncompressedIs, decompressor, 0);
+          dis = new DataInputStream(is);
+        }
+      }
+
+      // 5. read the rest of the params
+      Writable param = ReflectionUtils.newInstance(paramClass, conf);
       param.readFields(dis);
 
       Call call = new Call(id, param, this);
+      call.setRPCCompression(txCompression);
+      call.setVersion(version);
       callQueue.put(call);              // queue the call; maybe blocked here
     }
 
@@ -960,10 +1022,29 @@ public abstract class HBaseServer {
             }
           }
           ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
-          DataOutputStream out = new DataOutputStream(buf);
-          out.writeInt(call.id);                // write call id
-          out.writeBoolean(error != null);      // write error flag
+          DataOutputStream rawOS = new DataOutputStream(buf);
+          DataOutputStream out = rawOS;
+
+          // 1. write call id uncompressed
+          out.writeInt(call.id);
+
+          // 2. write error flag uncompressed
+          out.writeBoolean(error != null);
+
+          if (call.getVersion() >= VERSION_COMPRESSED_RPC) {
+            // 3. write the compression type for the rest of the response
+            out.writeUTF(call.getRPCCompression().getName());
+
+            // 4. create a compressed output stream if compression was enabled
+            if (call.getRPCCompression() != Compression.Algorithm.NONE) {
+              Compressor compressor = call.getRPCCompression().getCompressor();
+              OutputStream compressedOutputStream =
+                call.getRPCCompression().createCompressionStream(rawOS, compressor, 0);
+              out = new DataOutputStream(compressedOutputStream);
+            }
+          }
 
+          // 5. write the output as per the compression
           if (error == null) {
             value.write(out);
           } else {
@@ -971,6 +1052,8 @@ public abstract class HBaseServer {
             WritableUtils.writeString(out, error);
           }
 
+          out.flush();
+          buf.flush();
           call.setResponse(buf.getByteBuffer());
           responder.doRespond(call);
         } catch (InterruptedException e) {

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/ipc/TestRPCCompression.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/ipc/TestRPCCompression.java?rev=1310606&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/ipc/TestRPCCompression.java
(added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/ipc/TestRPCCompression.java
Fri Apr  6 22:10:45 2012
@@ -0,0 +1,113 @@
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestRPCCompression {
+  final Log LOG = LogFactory.getLog(getClass());
+  private static HBaseTestingUtility TEST_UTIL;
+  private static int SLAVES = 1;
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // start the cluster
+    Configuration conf = HBaseConfiguration.create();
+    TEST_UTIL = new HBaseTestingUtility(conf);
+    TEST_UTIL.startMiniCluster(SLAVES);
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    // Nothing to do.
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @After
+  public void tearDown() throws Exception {
+    // Nothing to do.
+  }
+
+  @Test
+  public void testCompressedRPC() throws Exception {
+    byte[] TABLE = Bytes.toBytes("testRPCCompression");
+    byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") };
+    byte [] QUALIFIER = Bytes.toBytes("testQualifier");
+    byte [] VALUE = Bytes.toBytes("testValue");
+
+    // create a table
+    TEST_UTIL.createTable(TABLE, FAMILIES);
+    LOG.debug("Created table " + new String(TABLE));
+
+    // open the table with compressed RPC
+    Configuration conf = HBaseConfiguration.create();
+    String zkPortStr = TEST_UTIL.getConfiguration().get(
+        "hbase.zookeeper.property.clientPort");
+    conf.setInt("hbase.zookeeper.property.clientPort",
+        Integer.parseInt(zkPortStr));
+    conf.set(HConstants.HBASE_RPC_COMPRESSION_KEY,
+        Compression.Algorithm.GZ.getName());
+    HTable table = new HTable(conf, TABLE);
+
+    // put some values
+    byte [][] ROWS = { Bytes.toBytes("a"), Bytes.toBytes("b") };
+    for (int i = 0; i < ROWS.length; i++) {
+      Put put = new Put(ROWS[i]);
+      put.add(FAMILIES[0], QUALIFIER, VALUE);
+      table.put(put);
+    }
+    LOG.debug("Wrote some puts to table " + new String(TABLE));
+
+    // flush the table
+    table.flushCommits();
+    LOG.debug("Flushed table " + new String(TABLE));
+
+    // read back the values
+    for (int i = 0; i < ROWS.length; i++) {
+      Get get = new Get(ROWS[i]);
+      get.addColumn(FAMILIES[0], QUALIFIER);
+      Result result = table.get(get);
+
+      assertEquals(new String(VALUE),
+		  new String(result.getValue(FAMILIES[0], QUALIFIER)));
+    }
+    LOG.debug("Read and verified from table " + new String(TABLE));
+  }
+}



Mime
View raw message