hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject hbase git commit: HBASE-16891 Try copying to the Netty ByteBuf directly from the WALEdit
Date Sat, 29 Oct 2016 15:32:14 GMT
Repository: hbase
Updated Branches:
  refs/heads/master ad0e862f7 -> 6127753b6


HBASE-16891 Try copying to the Netty ByteBuf directly from the WALEdit


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6127753b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6127753b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6127753b

Branch: refs/heads/master
Commit: 6127753b65b7724d7a7cbbc668b89f665700f94c
Parents: ad0e862
Author: zhangduo <zhangduo@apache.org>
Authored: Fri Oct 28 21:00:24 2016 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Sat Oct 29 23:30:52 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/io/ByteArrayOutputStream.java  |  11 +-
 .../hadoop/hbase/io/asyncfs/AsyncFSOutput.java  |  11 ++
 .../hbase/io/asyncfs/AsyncFSOutputHelper.java   | 101 ++++++++-----------
 .../asyncfs/FanOutOneBlockAsyncDFSOutput.java   |  58 ++++++++---
 .../wal/AsyncProtobufLogWriter.java             |  99 ++++++++++++------
 5 files changed, 171 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6127753b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
index d951595..93121df 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hbase.io;
 
-import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
@@ -50,28 +49,28 @@ public class ByteArrayOutputStream extends OutputStream implements ByteBufferSup
   }
 
   @Override
-  public void write(ByteBuffer b, int off, int len) throws IOException {
+  public void write(ByteBuffer b, int off, int len) {
     checkSizeAndGrow(len);
     ByteBufferUtils.copyFromBufferToArray(this.buf, b, off, this.pos, len);
     this.pos += len;
   }
 
   @Override
-  public void writeInt(int i) throws IOException {
+  public void writeInt(int i) {
     checkSizeAndGrow(Bytes.SIZEOF_INT);
     Bytes.putInt(this.buf, this.pos, i);
     this.pos += Bytes.SIZEOF_INT;
   }
 
   @Override
-  public void write(int b) throws IOException {
+  public void write(int b) {
     checkSizeAndGrow(Bytes.SIZEOF_BYTE);
     buf[this.pos] = (byte) b;
     this.pos++;
   }
 
   @Override
-  public void write(byte[] b, int off, int len) throws IOException {
+  public void write(byte[] b, int off, int len) {
     checkSizeAndGrow(len);
     System.arraycopy(b, off, this.buf, this.pos, len);
     this.pos += len;
@@ -109,7 +108,7 @@ public class ByteArrayOutputStream extends OutputStream implements ByteBufferSup
    * Copies the content of this Stream into a new byte array.
    * @return  the contents of this output stream, as new byte array.
    */
-  public byte toByteArray()[] {
+  public byte[] toByteArray() {
     return Arrays.copyOf(buf, pos);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/6127753b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
index 807d82a..0c60d3cf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.io.asyncfs;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.CompletionHandler;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -44,6 +45,16 @@ public interface AsyncFSOutput extends Closeable {
   void write(byte[] b, int off, int len);
 
   /**
+   * Write an int to the buffer.
+   */
+  void writeInt(int i);
+
+  /**
+   * Copy the data in the given {@code bb} into the buffer.
+   */
+  void write(ByteBuffer bb);
+
+  /**
    * Return the current size of buffered data.
    */
   int buffered();

http://git-wip-us.apache.org/repos/asf/hbase/blob/6127753b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
index 576bb29..c9d4e70 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
@@ -22,11 +22,10 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import io.netty.channel.EventLoop;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.CompletionHandler;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -36,6 +35,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -103,68 +103,45 @@ public final class AsyncFSOutputHelper {
         return new DatanodeInfo[0];
       }
 
-      @Override
-      public <A> void flush(final A attachment, final CompletionHandler<Long, ?
super A> handler,
-          final boolean sync) {
-        flushExecutor.execute(new Runnable() {
-
-          @Override
-          public void run() {
-            try {
-              synchronized (out) {
-                out.writeTo(fsOut);
-                out.reset();
-              }
-            } catch (final IOException e) {
-              eventLoop.execute(new Runnable() {
-
-                @Override
-                public void run() {
-                  handler.failed(e, attachment);
-                }
-              });
-              return;
-            }
-            try {
-              if (sync) {
-                fsOut.hsync();
-              } else {
-                fsOut.hflush();
-              }
-              final long pos = fsOut.getPos();
-              eventLoop.execute(new Runnable() {
-
-                @Override
-                public void run() {
-                  handler.completed(pos, attachment);
-                }
-              });
-            } catch (final IOException e) {
-              eventLoop.execute(new Runnable() {
-
-                @Override
-                public void run() {
-                  handler.failed(e, attachment);
-                }
-              });
-            }
+      private <A> void flush0(A attachment, CompletionHandler<Long, ? super A>
handler,
+          boolean sync) {
+        try {
+          synchronized (out) {
+            fsOut.write(out.getBuffer(), 0, out.size());
+            out.reset();
           }
-        });
+        } catch (IOException e) {
+          eventLoop.execute(() -> handler.failed(e, attachment));
+          return;
+        }
+        try {
+          if (sync) {
+            fsOut.hsync();
+          } else {
+            fsOut.hflush();
+          }
+          final long pos = fsOut.getPos();
+          eventLoop.execute(() -> handler.completed(pos, attachment));
+        } catch (final IOException e) {
+          eventLoop.execute(() -> handler.failed(e, attachment));
+        }
+      }
+
+      @Override
+      public <A> void flush(A attachment, CompletionHandler<Long, ? super A>
handler,
+          boolean sync) {
+        flushExecutor.execute(() -> flush0(attachment, handler, sync));
       }
 
       @Override
       public void close() throws IOException {
         try {
-          flushExecutor.submit(new Callable<Void>() {
-
-            @Override
-            public Void call() throws Exception {
-              synchronized (out) {
-                out.writeTo(fsOut);
-                out.reset();
-              }
-              return null;
+          flushExecutor.submit(() -> {
+            synchronized (out) {
+              fsOut.write(out.getBuffer(), 0, out.size());
+              out.reset();
             }
+            return null;
           }).get();
         } catch (InterruptedException e) {
           throw new InterruptedIOException();
@@ -181,6 +158,16 @@ public final class AsyncFSOutputHelper {
       public int buffered() {
         return out.size();
       }
+
+      @Override
+      public void writeInt(int i) {
+        out.writeInt(i);
+      }
+
+      @Override
+      public void write(ByteBuffer bb) {
+        out.write(bb, bb.position(), bb.remaining());
+      }
     };
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6127753b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index e130381..916e534 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -54,7 +54,6 @@ import java.util.Deque;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
@@ -349,6 +348,47 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
     setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
   }
 
+  private void writeInt0(int i) {
+    buf.ensureWritable(4);
+    if (cryptoCodec == null) {
+      buf.writeInt(i);
+    } else {
+      ByteBuffer inBuffer = ByteBuffer.allocate(4);
+      inBuffer.putInt(0, i);
+      cryptoCodec.encrypt(inBuffer, buf.nioBuffer(buf.writerIndex(), 4));
+      buf.writerIndex(buf.writerIndex() + 4);
+    }
+  }
+
+  @Override
+  public void writeInt(int i) {
+    if (eventLoop.inEventLoop()) {
+      writeInt0(i);
+    } else {
+      eventLoop.submit(() -> writeInt0(i));
+    }
+  }
+
+  private void write0(ByteBuffer bb) {
+    int len = bb.remaining();
+    buf.ensureWritable(len);
+    if (cryptoCodec == null) {
+      buf.writeBytes(bb);
+    } else {
+      cryptoCodec.encrypt(bb, buf.nioBuffer(buf.writerIndex(), len));
+      buf.writerIndex(buf.writerIndex() + len);
+    }
+  }
+
+  @Override
+  public void write(ByteBuffer bb) {
+    if (eventLoop.inEventLoop()) {
+      write0(bb);
+    } else {
+      eventLoop.submit(() -> write0(bb));
+    }
+  }
+
   @Override
   public void write(byte[] b) {
     write(b, 0, b.length);
@@ -370,13 +410,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
     if (eventLoop.inEventLoop()) {
       write0(b, off, len);
     } else {
-      eventLoop.submit(new Runnable() {
-
-        @Override
-        public void run() {
-          write0(b, off, len);
-        }
-      }).syncUninterruptibly();
+      eventLoop.submit(() -> write0(b, off, len)).syncUninterruptibly();
     }
   }
 
@@ -385,13 +419,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
     if (eventLoop.inEventLoop()) {
       return buf.readableBytes();
     } else {
-      return eventLoop.submit(new Callable<Integer>() {
-
-        @Override
-        public Integer call() throws Exception {
-          return buf.readableBytes();
-        }
-      }).syncUninterruptibly().getNow().intValue();
+      return eventLoop.submit(() -> buf.readableBytes()).syncUninterruptibly().getNow().intValue();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/6127753b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index e2080ff..db3088c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -18,13 +18,13 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import com.google.common.base.Throwables;
-import com.google.common.primitives.Ints;
 
 import io.netty.channel.EventLoop;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.nio.channels.CompletionHandler;
 
 import org.apache.commons.logging.Log;
@@ -33,7 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
+import org.apache.hadoop.hbase.io.ByteBufferSupportOutputStream;
 import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
 import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
@@ -45,8 +45,8 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
  * AsyncWriter for protobuf-based WAL.
  */
 @InterfaceAudience.Private
-public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements
-    AsyncFSWALProvider.AsyncWriter {
+public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
+    implements AsyncFSWALProvider.AsyncWriter {
 
   private static final Log LOG = LogFactory.getLog(AsyncProtobufLogWriter.class);
 
@@ -98,7 +98,48 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements
 
   private AsyncFSOutput output;
 
-  private ByteArrayOutputStream buf;
+  private static final class OutputStreamWrapper extends OutputStream
+      implements ByteBufferSupportOutputStream {
+
+    private final AsyncFSOutput out;
+
+    private final byte[] oneByteBuf = new byte[1];
+
+    @Override
+    public void write(int b) throws IOException {
+      oneByteBuf[0] = (byte) b;
+      write(oneByteBuf);
+    }
+
+    public OutputStreamWrapper(AsyncFSOutput out) {
+      this.out = out;
+    }
+
+    @Override
+    public void write(ByteBuffer b, int off, int len) throws IOException {
+      ByteBuffer bb = b.duplicate();
+      bb.position(off);
+      bb.limit(off + len);
+      out.write(bb);
+    }
+
+    @Override
+    public void writeInt(int i) throws IOException {
+      out.writeInt(i);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      out.write(b, off, len);
+    }
+
+    @Override
+    public void close() throws IOException {
+      out.close();
+    }
+  }
+
+  private OutputStream asyncOutputWrapper;
 
   public AsyncProtobufLogWriter(EventLoop eventLoop) {
     this.eventLoop = eventLoop;
@@ -106,26 +147,22 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
implements
 
   @Override
   public void append(Entry entry) {
-    buf.reset();
+    int buffered = output.buffered();
     entry.setCompressionContext(compressionContext);
     try {
       entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()
-          .writeDelimitedTo(buf);
+          .writeDelimitedTo(asyncOutputWrapper);
     } catch (IOException e) {
       throw new AssertionError("should not happen", e);
     }
-    length.addAndGet(buf.size());
-    output.write(buf.getBuffer(), 0, buf.size());
     try {
       for (Cell cell : entry.getEdit().getCells()) {
-        buf.reset();
         cellEncoder.write(cell);
-        length.addAndGet(buf.size());
-        output.write(buf.getBuffer(), 0, buf.size());
       }
     } catch (IOException e) {
       throw new AssertionError("should not happen", e);
     }
+    length.addAndGet(output.buffered() - buffered);
   }
 
   @Override
@@ -157,22 +194,21 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
implements
       short replication, long blockSize) throws IOException {
     this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
       blockSize, eventLoop);
-    this.buf = new ByteArrayOutputStream();
+    this.asyncOutputWrapper = new OutputStreamWrapper(output);
   }
 
   @Override
   protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException
{
-    buf.reset();
-    header.writeDelimitedTo(buf);
     final BlockingCompletionHandler handler = new BlockingCompletionHandler();
-    eventLoop.execute(new Runnable() {
-
-      @Override
-      public void run() {
-        output.write(ProtobufLogReader.PB_WAL_MAGIC);
-        output.write(buf.getBuffer(), 0, buf.size());
-        output.flush(null, handler, false);
+    eventLoop.execute(() -> {
+      output.write(magic);
+      try {
+        header.writeDelimitedTo(asyncOutputWrapper);
+      } catch (IOException e) {
+        // should not happen
+        throw new AssertionError(e);
       }
+      output.flush(null, handler, false);
     });
     return handler.get();
   }
@@ -180,22 +216,23 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
implements
   @Override
   protected long writeWALTrailerAndMagic(WALTrailer trailer, final byte[] magic)
       throws IOException {
-    buf.reset();
-    trailer.writeTo(buf);
     final BlockingCompletionHandler handler = new BlockingCompletionHandler();
-    eventLoop.execute(new Runnable() {
-      public void run() {
-        output.write(buf.getBuffer(), 0, buf.size());
-        output.write(Ints.toByteArray(buf.size()));
-        output.write(magic);
-        output.flush(null, handler, false);
+    eventLoop.execute(() -> {
+      try {
+        trailer.writeTo(asyncOutputWrapper);
+      } catch (IOException e) {
+        // should not happen
+        throw new AssertionError(e);
       }
+      output.writeInt(trailer.getSerializedSize());
+      output.write(magic);
+      output.flush(null, handler, false);
     });
     return handler.get();
   }
 
   @Override
   protected OutputStream getOutputStreamForCellEncoder() {
-    return buf;
+    return asyncOutputWrapper;
   }
 }


Mime
View raw message