hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From raw...@apache.org
Subject svn commit: r1033274 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/io/ src/main/java/org/apache/hadoop/hbase/ipc/
Date Tue, 09 Nov 2010 22:40:44 GMT
Author: rawson
Date: Tue Nov  9 22:40:26 2010
New Revision: 1033274

URL: http://svn.apache.org/viewvc?rev=1033274&view=rev
Log:
HBASE-3199  large response handling: some fixups and cleanups


Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/WritableWithSize.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Result.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1033274&r1=1033273&r2=1033274&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Nov  9 22:40:26 2010
@@ -664,6 +664,7 @@ Release 0.90.0 - Unreleased
    HBASE-3112  Enable and disable of table needs a bit of loving in new master
    HBASE-3207  If we get IOException when closing a region, we should still
                remove it from online regions and complete the close in ZK
+   HBASE-3199  large response handling: some fixups and cleanups
 
 
   IMPROVEMENTS

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Result.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Result.java?rev=1033274&r1=1033273&r2=1033274&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Result.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Result.java Tue Nov  9 22:40:26
2010
@@ -24,6 +24,7 @@ import com.google.common.collect.Orderin
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.SplitKeyValue;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.WritableWithSize;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
 
@@ -65,7 +66,7 @@ import java.util.TreeMap;
  * through {@link KeyValue#getRow()}, {@link KeyValue#getFamily()}, {@link KeyValue#getQualifier()},
  * {@link KeyValue#getTimestamp()}, and {@link KeyValue#getValue()}.
  */
-public class Result implements Writable {
+public class Result implements Writable, WritableWithSize {
   private static final byte RESULT_VERSION = (byte)1;
 
   private KeyValue [] kvs = null;
@@ -523,6 +524,20 @@ public class Result implements Writable 
     this.kvs = kvs.toArray(new KeyValue[kvs.size()]);
   }
 
+  public long getWritableSize() {
+    if (isEmpty())
+      return Bytes.SIZEOF_INT; // int size = 0
+
+    long size = Bytes.SIZEOF_INT; // totalLen
+
+    for (KeyValue kv : kvs) {
+      size += kv.getLength();
+      size += Bytes.SIZEOF_INT; // kv.getLength
+    }
+
+    return size;
+  }
+
   public void write(final DataOutput out)
   throws IOException {
     if(isEmpty()) {
@@ -540,6 +555,29 @@ public class Result implements Writable 
     }
   }
 
+  public static long getWriteArraySize(Result [] results) {
+    long size = Bytes.SIZEOF_BYTE; // RESULT_VERSION
+    if (results == null || results.length == 0) {
+      size += Bytes.SIZEOF_INT;
+      return size;
+    }
+
+    size += Bytes.SIZEOF_INT; // results.length
+    size += Bytes.SIZEOF_INT; // bufLen
+    for (Result result : results) {
+      size += Bytes.SIZEOF_INT; // either 0 or result.size()
+      if (result == null || result.isEmpty())
+        continue;
+
+      for (KeyValue kv : result.raw()) {
+        size += Bytes.SIZEOF_INT; // kv.getLength();
+        size += kv.getLength();
+      }
+    }
+
+    return size;
+  }
+
   public static void writeArray(final DataOutput out, Result [] results)
   throws IOException {
     // Write version when writing array form.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=1033274&r1=1033273&r2=1033274&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Tue Nov
 9 22:40:26 2010
@@ -96,7 +96,7 @@ import org.apache.hadoop.io.WritableFact
  * name and reflection to instantiate class was costing in excess of the cell
  * handling).
  */
-public class HbaseObjectWritable implements Writable, Configurable {
+public class HbaseObjectWritable implements Writable, WritableWithSize, Configurable {
   protected final static Log LOG = LogFactory.getLog(HbaseObjectWritable.class);
 
   // Here we maintain two static maps of classes to code and vice versa.
@@ -260,6 +260,10 @@ public class HbaseObjectWritable impleme
     writeObject(out, instance, declaredClass, conf);
   }
 
+  public long getWritableSize() {
+    return getWritableSize(instance, declaredClass, conf);
+  }
+
   private static class NullInstance extends Configured implements Writable {
     Class<?> declaredClass;
     /** default constructor for writable */
@@ -314,6 +318,27 @@ public class HbaseObjectWritable impleme
     out.writeByte(code);
   }
 
+
+  public static long getWritableSize(Object instance, Class declaredClass,
+                                     Configuration conf) {
+    long size = Bytes.SIZEOF_BYTE; // code
+    if (instance == null) {
+      return 0L;
+    }
+
+    if (declaredClass.isArray()) {
+      if (declaredClass.equals(Result[].class)) {
+
+        return size + Result.getWriteArraySize((Result[])instance);
+      }
+    }
+    if (declaredClass.equals(Result.class)) {
+      Result r = (Result) instance;
+      // one extra class code for writable instance.
+      return r.getWritableSize() + size + Bytes.SIZEOF_BYTE;
+    }
+    return 0L; // no hint is the default.
+  }
   /**
    * Write a {@link Writable}, {@link String}, primitive type, or an array of
    * the preceding.

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/WritableWithSize.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/WritableWithSize.java?rev=1033274&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/WritableWithSize.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/WritableWithSize.java Tue Nov  9
22:40:26 2010
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+/**
+ * An optional interface to 'size' writables.
+ */
+public interface WritableWithSize {
+  /**
+   * Provide a size hint to the caller. write() should ideally
+   * not go beyond this if at all possible.
+   *
+   * You can return 0 if there is no size hint.
+   *
+   * @return the size of the writable
+   */
+  public long getWritableSize();
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java?rev=1033274&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ByteBufferOutputStream.java Tue
Nov  9 22:40:26 2010
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Not thread safe!
+ */
+public class ByteBufferOutputStream extends OutputStream {
+
+  protected ByteBuffer buf;
+
+  public ByteBufferOutputStream(int capacity) {
+    this(capacity, false);
+  }
+
+  public ByteBufferOutputStream(int capacity, boolean useDirectByteBuffer) {
+    if (useDirectByteBuffer) {
+      buf = ByteBuffer.allocateDirect(capacity);
+    } else {
+      buf = ByteBuffer.allocate(capacity);
+    }
+  }
+
+  public int size() {
+    return buf.position();
+  }
+
+  /**
+   * This flips the underlying BB so be sure to use it _last_!
+   * @return
+   */
+  public ByteBuffer getByteBuffer() {
+    buf.flip();
+    return buf;
+  }
+
+  private void checkSizeAndGrow(int extra) {
+    if ( (buf.position() + extra) > buf.limit()) {
+      // size calculation is complex, because we could overflow negative,
+      // and/or not allocate enough space. this fixes that.
+      int newSize = (int)Math.min((((long)buf.capacity()) * 2),
+          (long)(Integer.MAX_VALUE));
+      newSize = Math.max(newSize, buf.position() + extra);
+
+      ByteBuffer newBuf = ByteBuffer.allocate(newSize);
+      buf.flip();
+      newBuf.put(buf);
+      buf = newBuf;
+    }
+  }
+
+  // OutputStream
+  @Override
+  public void write(int b) throws IOException {
+    checkSizeAndGrow(Bytes.SIZEOF_BYTE);
+
+    buf.put((byte)b);
+  }
+
+  @Override
+  public void write(byte[] b) throws IOException {
+    checkSizeAndGrow(b.length);
+
+    buf.put(b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    checkSizeAndGrow(len);
+
+    buf.put(b, off, len);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    // noop
+  }
+
+  @Override
+  public void close() throws IOException {
+    // noop again. heh
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1033274&r1=1033273&r2=1033274&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Tue Nov  9 22:40:26
2010
@@ -55,6 +55,8 @@ import java.util.concurrent.LinkedBlocki
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.WritableWithSize;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
@@ -89,6 +91,14 @@ public abstract class HBaseServer {
    */
   private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
 
+  private static final String WARN_RESPONSE_SIZE =
+      "hbase.ipc.warn.response.size";
+
+  /** Default value for above param */
+  private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
+
+  private final int warnResponseSize;
+
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
 
@@ -989,6 +999,8 @@ public abstract class HBaseServer {
   /** Handles queued calls . */
   private class Handler extends Thread {
     private final BlockingQueue<Call> myCallQueue;
+    static final int BUFFER_INITIAL_SIZE = 1024;
+
     public Handler(final BlockingQueue<Call> cq, int instanceNumber) {
       this.myCallQueue = cq;
       this.setDaemon(true);
@@ -1005,8 +1017,6 @@ public abstract class HBaseServer {
     public void run() {
       LOG.info(getName() + ": starting");
       SERVER.set(HBaseServer.this);
-      final int buffersize = 16 * 1024;
-      ByteArrayOutputStream buf = new ByteArrayOutputStream(buffersize);
       while (running) {
         try {
           Call call = myCallQueue.take(); // pop the queue; maybe blocked here
@@ -1031,14 +1041,24 @@ public abstract class HBaseServer {
           }
           CurCall.set(null);
 
-          if (buf.size() > buffersize) {
-            // Allocate a new BAOS as reset only moves size back to zero but
-            // keeps the buffer of whatever the largest write was -- see
-            // hbase-900.
-            buf = new ByteArrayOutputStream(buffersize);
-          } else {
-            buf.reset();
+          int size = BUFFER_INITIAL_SIZE;
+          if (value instanceof WritableWithSize) {
+            // get the size hint.
+            WritableWithSize ohint = (WritableWithSize)value;
+            long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT;
+            if (hint > 0) {
+              if ((hint) > Integer.MAX_VALUE) {
+                // oops, new problem.
+                IOException ioe =
+                    new IOException("Result buffer size too large: " + hint);
+                errorClass = ioe.getClass().getName();
+                error = StringUtils.stringifyException(ioe);
+              } else {
+                size = (int)hint;
+              }
+            }
           }
+          ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
           DataOutputStream out = new DataOutputStream(buf);
           out.writeInt(call.id);                // write call id
           out.writeBoolean(error != null);      // write error flag
@@ -1049,7 +1069,14 @@ public abstract class HBaseServer {
             WritableUtils.writeString(out, errorClass);
             WritableUtils.writeString(out, error);
           }
-          call.setResponse(ByteBuffer.wrap(buf.toByteArray()));
+
+          if (buf.size() > warnResponseSize) {
+            LOG.warn(getName()+", responseTooLarge for: "+call+": Size: "
+                     + StringUtils.humanReadableInt(buf.size()));
+          }
+
+
+          call.setResponse(buf.getByteBuffer());
           responder.doRespond(call);
         } catch (InterruptedException e) {
           if (running) {                          // unexpected -- log it
@@ -1140,6 +1167,10 @@ public abstract class HBaseServer {
     this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
     this.tcpKeepAlive = conf.getBoolean("ipc.server.tcpkeepalive", true);
 
+    this.warnResponseSize = conf.getInt(WARN_RESPONSE_SIZE,
+                                        DEFAULT_WARN_RESPONSE_SIZE);
+
+
     // Create the responder here
     responder = new Responder();
   }



Mime
View raw message