Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5CA3018A05 for ; Fri, 5 Feb 2016 04:32:20 +0000 (UTC) Received: (qmail 86624 invoked by uid 500); 5 Feb 2016 04:32:17 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 86325 invoked by uid 500); 5 Feb 2016 04:32:16 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 81353 invoked by uid 99); 5 Feb 2016 04:32:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Feb 2016 04:32:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4AC40E0B28; Fri, 5 Feb 2016 04:32:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Fri, 05 Feb 2016 04:32:48 -0000 Message-Id: In-Reply-To: <49df44bf87274c6ca4833f867e09c715@git.apache.org> References: <49df44bf87274c6ca4833f867e09c715@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [38/51] [partial] hbase-site git commit: Published site at 18eff3c1c337003b2a419490e621f931d16936fb. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/a8725a46/devapidocs/src-html/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.html b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.html index 86ba709..f4bddcd 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.html @@ -420,7 +420,7 @@ 412 requestHeaderBuilder.setCellBlockMeta(cellBlockBuilder.build()); 413 } 414 // Only pass priority if there one. Let zero be same as no priority. -415 if (call.controller.getPriority() != 0) { +415 if (call.controller.getPriority() != PayloadCarryingRpcController.PRIORITY_UNSET) { 416 requestHeaderBuilder.setPriority(call.controller.getPriority()); 417 } 418 @@ -668,96 +668,97 @@ 660 private void handleSaslConnectionFailure(final int currRetries, final Throwable ex, 661 final UserGroupInformation user) throws IOException, InterruptedException { 662 user.doAs(new PrivilegedExceptionAction<Void>() { -663 public Void run() throws IOException, InterruptedException { -664 if (shouldAuthenticateOverKrb()) { -665 if (currRetries < MAX_SASL_RETRIES) { -666 LOG.debug("Exception encountered while connecting to the server : " + ex); -667 //try re-login -668 if (UserGroupInformation.isLoginKeytabBased()) { -669 UserGroupInformation.getLoginUser().reloginFromKeytab(); -670 } else { -671 UserGroupInformation.getLoginUser().reloginFromTicketCache(); -672 } -673 -674 // Should reconnect -675 return null; -676 } else { -677 String msg = "Couldn't setup connection for " + -678 UserGroupInformation.getLoginUser().getUserName() + -679 " to " + serverPrincipal; -680 LOG.warn(msg); -681 throw (IOException) new IOException(msg).initCause(ex); -682 } -683 } else { -684 LOG.warn("Exception encountered while connecting to " + -685 "the server : " + ex); -686 } -687 if (ex instanceof RemoteException) { -688 throw (RemoteException) ex; -689 } -690 if (ex instanceof SaslException) { -691 String msg = "SASL authentication failed." + -692 " The most likely cause is missing or invalid credentials." + -693 " Consider 'kinit'."; -694 LOG.fatal(msg, ex); -695 throw new RuntimeException(msg, ex); -696 } -697 throw new IOException(ex); -698 } -699 }); -700 } -701 -702 public int getConnectionHashCode() { -703 return ConnectionId.hashCode(ticket, serviceName, address); -704 } -705 -706 @Override -707 public int hashCode() { -708 return getConnectionHashCode(); -709 } -710 -711 @Override -712 public boolean equals(Object obj) { -713 if (obj instanceof AsyncRpcChannel) { -714 AsyncRpcChannel channel = (AsyncRpcChannel) obj; -715 return channel.hashCode() == obj.hashCode(); -716 } -717 return false; -718 } -719 +663 @Override +664 public Void run() throws IOException, InterruptedException { +665 if (shouldAuthenticateOverKrb()) { +666 if (currRetries < MAX_SASL_RETRIES) { +667 LOG.debug("Exception encountered while connecting to the server : " + ex); +668 //try re-login +669 if (UserGroupInformation.isLoginKeytabBased()) { +670 UserGroupInformation.getLoginUser().reloginFromKeytab(); +671 } else { +672 UserGroupInformation.getLoginUser().reloginFromTicketCache(); +673 } +674 +675 // Should reconnect +676 return null; +677 } else { +678 String msg = "Couldn't setup connection for " + +679 UserGroupInformation.getLoginUser().getUserName() + +680 " to " + serverPrincipal; +681 LOG.warn(msg); +682 throw (IOException) new IOException(msg).initCause(ex); +683 } +684 } else { +685 LOG.warn("Exception encountered while connecting to " + +686 "the server : " + ex); +687 } +688 if (ex instanceof RemoteException) { +689 throw (RemoteException) ex; +690 } +691 if (ex instanceof SaslException) { +692 String msg = "SASL authentication failed." + +693 " The most likely cause is missing or invalid credentials." + +694 " Consider 'kinit'."; +695 LOG.fatal(msg, ex); +696 throw new RuntimeException(msg, ex); +697 } +698 throw new IOException(ex); +699 } +700 }); +701 } +702 +703 public int getConnectionHashCode() { +704 return ConnectionId.hashCode(ticket, serviceName, address); +705 } +706 +707 @Override +708 public int hashCode() { +709 return getConnectionHashCode(); +710 } +711 +712 @Override +713 public boolean equals(Object obj) { +714 if (obj instanceof AsyncRpcChannel) { +715 AsyncRpcChannel channel = (AsyncRpcChannel) obj; +716 return channel.hashCode() == obj.hashCode(); +717 } +718 return false; +719 } 720 -721 @Override -722 public String toString() { -723 return this.address.toString() + "/" + this.serviceName + "/" + this.ticket; -724 } -725 -726 /** -727 * Listens to call writes and fails if write failed -728 */ -729 private static final class CallWriteListener implements ChannelFutureListener { -730 private final AsyncRpcChannel rpcChannel; -731 private final int id; -732 -733 public CallWriteListener(AsyncRpcChannel asyncRpcChannel, int id) { -734 this.rpcChannel = asyncRpcChannel; -735 this.id = id; -736 } -737 -738 @Override -739 public void operationComplete(ChannelFuture future) throws Exception { -740 if (!future.isSuccess()) { -741 AsyncCall call = rpcChannel.removePendingCall(id); -742 if (call != null) { -743 if (future.cause() instanceof IOException) { -744 call.setFailed((IOException) future.cause()); -745 } else { -746 call.setFailed(new IOException(future.cause())); -747 } -748 } -749 } -750 } -751 } -752} +721 +722 @Override +723 public String toString() { +724 return this.address.toString() + "/" + this.serviceName + "/" + this.ticket; +725 } +726 +727 /** +728 * Listens to call writes and fails if write failed +729 */ +730 private static final class CallWriteListener implements ChannelFutureListener { +731 private final AsyncRpcChannel rpcChannel; +732 private final int id; +733 +734 public CallWriteListener(AsyncRpcChannel asyncRpcChannel, int id) { +735 this.rpcChannel = asyncRpcChannel; +736 this.id = id; +737 } +738 +739 @Override +740 public void operationComplete(ChannelFuture future) throws Exception { +741 if (!future.isSuccess()) { +742 AsyncCall call = rpcChannel.removePendingCall(id); +743 if (call != null) { +744 if (future.cause() instanceof IOException) { +745 call.setFailed((IOException) future.cause()); +746 } else { +747 call.setFailed(new IOException(future.cause())); +748 } +749 } +750 } +751 } +752 } +753} http://git-wip-us.apache.org/repos/asf/hbase-site/blob/a8725a46/devapidocs/src-html/org/apache/hadoop/hbase/ipc/IPCUtil.CellScannerButNoCodecException.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/IPCUtil.CellScannerButNoCodecException.html b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/IPCUtil.CellScannerButNoCodecException.html index 1b5adb7..8f9e5a8 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/IPCUtil.CellScannerButNoCodecException.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/IPCUtil.CellScannerButNoCodecException.html @@ -25,25 +25,25 @@ 017 */ 018package org.apache.hadoop.hbase.ipc; 019 -020import java.io.ByteArrayInputStream; -021import java.io.DataInput; -022import java.io.IOException; -023import java.io.InputStream; -024import java.io.OutputStream; -025import java.nio.BufferOverflowException; -026import java.nio.ByteBuffer; -027 -028import org.apache.commons.io.IOUtils; -029import org.apache.commons.logging.Log; -030import org.apache.commons.logging.LogFactory; -031import org.apache.hadoop.hbase.classification.InterfaceAudience; -032import org.apache.hadoop.conf.Configurable; -033import org.apache.hadoop.conf.Configuration; -034import org.apache.hadoop.hbase.CellScanner; -035import org.apache.hadoop.hbase.DoNotRetryIOException; -036import org.apache.hadoop.hbase.HBaseIOException; -037import org.apache.hadoop.hbase.codec.Codec; -038import org.apache.hadoop.hbase.io.BoundedByteBufferPool; +020import java.io.DataInput; +021import java.io.IOException; +022import java.io.InputStream; +023import java.io.OutputStream; +024import java.nio.BufferOverflowException; +025import java.nio.ByteBuffer; +026 +027import org.apache.commons.io.IOUtils; +028import org.apache.commons.logging.Log; +029import org.apache.commons.logging.LogFactory; +030import org.apache.hadoop.hbase.classification.InterfaceAudience; +031import org.apache.hadoop.conf.Configurable; +032import org.apache.hadoop.conf.Configuration; +033import org.apache.hadoop.hbase.CellScanner; +034import org.apache.hadoop.hbase.DoNotRetryIOException; +035import org.apache.hadoop.hbase.HBaseIOException; +036import org.apache.hadoop.hbase.codec.Codec; +037import org.apache.hadoop.hbase.io.BoundedByteBufferPool; +038import org.apache.hadoop.hbase.io.ByteBufferInputStream; 039import org.apache.hadoop.hbase.io.ByteBufferOutputStream; 040import org.apache.hadoop.hbase.io.HeapSize; 041import org.apache.hadoop.hbase.util.Bytes; @@ -188,136 +188,134 @@ 180 public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, 181 final byte [] cellBlock) 182 throws IOException { -183 return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length); +183 return createCellScanner(codec, compressor, ByteBuffer.wrap(cellBlock)); 184 } 185 186 /** 187 * @param codec -188 * @param cellBlock -189 * @param offset -190 * @param length -191 * @return CellScanner to work against the content of <code>cellBlock</code> -192 * @throws IOException -193 */ -194 public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, -195 final byte [] cellBlock, final int offset, final int length) -196 throws IOException { -197 // If compressed, decompress it first before passing it on else we will leak compression -198 // resources if the stream is not closed properly after we let it out. -199 InputStream is = null; -200 if (compressor != null) { -201 // GZIPCodec fails w/ NPE if no configuration. -202 if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf); -203 Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); -204 CompressionInputStream cis = -205 compressor.createInputStream(new ByteArrayInputStream(cellBlock, offset, length), -206 poolDecompressor); -207 ByteBufferOutputStream bbos = null; -208 try { -209 // TODO: This is ugly. The buffer will be resized on us if we guess wrong. -210 // TODO: Reuse buffers. -211 bbos = new ByteBufferOutputStream((length - offset) * -212 this.cellBlockDecompressionMultiplier); -213 IOUtils.copy(cis, bbos); -214 bbos.close(); -215 ByteBuffer bb = bbos.getByteBuffer(); -216 is = new ByteArrayInputStream(bb.array(), 0, bb.limit()); -217 } finally { -218 if (is != null) is.close(); -219 if (bbos != null) bbos.close(); -220 -221 CodecPool.returnDecompressor(poolDecompressor); -222 } -223 } else { -224 is = new ByteArrayInputStream(cellBlock, offset, length); -225 } -226 return codec.getDecoder(is); -227 } -228 -229 /** -230 * @param m Message to serialize delimited; i.e. w/ a vint of its size preceeding its -231 * serialization. -232 * @return The passed in Message serialized with delimiter. Return null if <code>m</code> is null -233 * @throws IOException -234 */ -235 public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException { -236 if (m == null) return null; -237 int serializedSize = m.getSerializedSize(); -238 int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize); -239 byte [] buffer = new byte[serializedSize + vintSize]; -240 // Passing in a byte array saves COS creating a buffer which it does when using streams. -241 CodedOutputStream cos = CodedOutputStream.newInstance(buffer); -242 // This will write out the vint preamble and the message serialized. -243 cos.writeMessageNoTag(m); -244 cos.flush(); -245 cos.checkNoSpaceLeft(); -246 return ByteBuffer.wrap(buffer); -247 } -248 -249 /** -250 * Write out header, param, and cell block if there is one. -251 * @param dos -252 * @param header -253 * @param param -254 * @param cellBlock -255 * @return Total number of bytes written. -256 * @throws IOException -257 */ -258 public static int write(final OutputStream dos, final Message header, final Message param, -259 final ByteBuffer cellBlock) -260 throws IOException { -261 // Must calculate total size and write that first so other side can read it all in in one -262 // swoop. This is dictated by how the server is currently written. Server needs to change -263 // if we are to be able to write without the length prefixing. -264 int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param); -265 if (cellBlock != null) totalSize += cellBlock.remaining(); -266 return write(dos, header, param, cellBlock, totalSize); -267 } -268 -269 private static int write(final OutputStream dos, final Message header, final Message param, -270 final ByteBuffer cellBlock, final int totalSize) -271 throws IOException { -272 // I confirmed toBytes does same as DataOutputStream#writeInt. -273 dos.write(Bytes.toBytes(totalSize)); -274 // This allocates a buffer that is the size of the message internally. -275 header.writeDelimitedTo(dos); -276 if (param != null) param.writeDelimitedTo(dos); -277 if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining()); -278 dos.flush(); -279 return totalSize; -280 } -281 -282 /** -283 * Read in chunks of 8K (HBASE-7239) -284 * @param in -285 * @param dest -286 * @param offset -287 * @param len -288 * @throws IOException -289 */ -290 public static void readChunked(final DataInput in, byte[] dest, int offset, int len) -291 throws IOException { -292 int maxRead = 8192; -293 -294 for (; offset < len; offset += maxRead) { -295 in.readFully(dest, offset, Math.min(len - offset, maxRead)); -296 } -297 } -298 -299 /** -300 * @return Size on the wire when the two messages are written with writeDelimitedTo -301 */ -302 public static int getTotalSizeWhenWrittenDelimited(Message ... messages) { -303 int totalSize = 0; -304 for (Message m: messages) { -305 if (m == null) continue; -306 totalSize += m.getSerializedSize(); -307 totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize()); -308 } -309 Preconditions.checkArgument(totalSize < Integer.MAX_VALUE); -310 return totalSize; -311 } -312} +188 * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be +189 * position()'ed at the start of the cell block and limit()'ed at the end. +190 * @return CellScanner to work against the content of <code>cellBlock</code> +191 * @throws IOException +192 */ +193 public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, +194 final ByteBuffer cellBlock) +195 throws IOException { +196 // If compressed, decompress it first before passing it on else we will leak compression +197 // resources if the stream is not closed properly after we let it out. +198 InputStream is = null; +199 if (compressor != null) { +200 // GZIPCodec fails w/ NPE if no configuration. +201 if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf); +202 Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); +203 CompressionInputStream cis = +204 compressor.createInputStream(new ByteBufferInputStream(cellBlock), poolDecompressor); +205 ByteBufferOutputStream bbos = null; +206 try { +207 // TODO: This is ugly. The buffer will be resized on us if we guess wrong. +208 // TODO: Reuse buffers. +209 bbos = new ByteBufferOutputStream(cellBlock.remaining() * +210 this.cellBlockDecompressionMultiplier); +211 IOUtils.copy(cis, bbos); +212 bbos.close(); +213 ByteBuffer bb = bbos.getByteBuffer(); +214 is = new ByteBufferInputStream(bb); +215 } finally { +216 if (is != null) is.close(); +217 if (bbos != null) bbos.close(); +218 +219 CodecPool.returnDecompressor(poolDecompressor); +220 } +221 } else { +222 is = new ByteBufferInputStream(cellBlock); +223 } +224 return codec.getDecoder(is); +225 } +226 +227 /** +228 * @param m Message to serialize delimited; i.e. w/ a vint of its size preceeding its +229 * serialization. +230 * @return The passed in Message serialized with delimiter. Return null if <code>m</code> is null +231 * @throws IOException +232 */ +233 public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException { +234 if (m == null) return null; +235 int serializedSize = m.getSerializedSize(); +236 int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize); +237 byte [] buffer = new byte[serializedSize + vintSize]; +238 // Passing in a byte array saves COS creating a buffer which it does when using streams. +239 CodedOutputStream cos = CodedOutputStream.newInstance(buffer); +240 // This will write out the vint preamble and the message serialized. +241 cos.writeMessageNoTag(m); +242 cos.flush(); +243 cos.checkNoSpaceLeft(); +244 return ByteBuffer.wrap(buffer); +245 } +246 +247 /** +248 * Write out header, param, and cell block if there is one. +249 * @param dos +250 * @param header +251 * @param param +252 * @param cellBlock +253 * @return Total number of bytes written. +254 * @throws IOException +255 */ +256 public static int write(final OutputStream dos, final Message header, final Message param, +257 final ByteBuffer cellBlock) +258 throws IOException { +259 // Must calculate total size and write that first so other side can read it all in in one +260 // swoop. This is dictated by how the server is currently written. Server needs to change +261 // if we are to be able to write without the length prefixing. +262 int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param); +263 if (cellBlock != null) totalSize += cellBlock.remaining(); +264 return write(dos, header, param, cellBlock, totalSize); +265 } +266 +267 private static int write(final OutputStream dos, final Message header, final Message param, +268 final ByteBuffer cellBlock, final int totalSize) +269 throws IOException { +270 // I confirmed toBytes does same as DataOutputStream#writeInt. +271 dos.write(Bytes.toBytes(totalSize)); +272 // This allocates a buffer that is the size of the message internally. +273 header.writeDelimitedTo(dos); +274 if (param != null) param.writeDelimitedTo(dos); +275 if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining()); +276 dos.flush(); +277 return totalSize; +278 } +279 +280 /** +281 * Read in chunks of 8K (HBASE-7239) +282 * @param in +283 * @param dest +284 * @param offset +285 * @param len +286 * @throws IOException +287 */ +288 public static void readChunked(final DataInput in, byte[] dest, int offset, int len) +289 throws IOException { +290 int maxRead = 8192; +291 +292 for (; offset < len; offset += maxRead) { +293 in.readFully(dest, offset, Math.min(len - offset, maxRead)); +294 } +295 } +296 +297 /** +298 * @return Size on the wire when the two messages are written with writeDelimitedTo +299 */ +300 public static int getTotalSizeWhenWrittenDelimited(Message ... messages) { +301 int totalSize = 0; +302 for (Message m: messages) { +303 if (m == null) continue; +304 totalSize += m.getSerializedSize(); +305 totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize()); +306 } +307 Preconditions.checkArgument(totalSize < Integer.MAX_VALUE); +308 return totalSize; +309 } +310} http://git-wip-us.apache.org/repos/asf/hbase-site/blob/a8725a46/devapidocs/src-html/org/apache/hadoop/hbase/ipc/IPCUtil.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/IPCUtil.html b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/IPCUtil.html index 1b5adb7..8f9e5a8 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/IPCUtil.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/IPCUtil.html @@ -25,25 +25,25 @@ 017 */ 018package org.apache.hadoop.hbase.ipc; 019 -020import java.io.ByteArrayInputStream; -021import java.io.DataInput; -022import java.io.IOException; -023import java.io.InputStream; -024import java.io.OutputStream; -025import java.nio.BufferOverflowException; -026import java.nio.ByteBuffer; -027 -028import org.apache.commons.io.IOUtils; -029import org.apache.commons.logging.Log; -030import org.apache.commons.logging.LogFactory; -031import org.apache.hadoop.hbase.classification.InterfaceAudience; -032import org.apache.hadoop.conf.Configurable; -033import org.apache.hadoop.conf.Configuration; -034import org.apache.hadoop.hbase.CellScanner; -035import org.apache.hadoop.hbase.DoNotRetryIOException; -036import org.apache.hadoop.hbase.HBaseIOException; -037import org.apache.hadoop.hbase.codec.Codec; -038import org.apache.hadoop.hbase.io.BoundedByteBufferPool; +020import java.io.DataInput; +021import java.io.IOException; +022import java.io.InputStream; +023import java.io.OutputStream; +024import java.nio.BufferOverflowException; +025import java.nio.ByteBuffer; +026 +027import org.apache.commons.io.IOUtils; +028import org.apache.commons.logging.Log; +029import org.apache.commons.logging.LogFactory; +030import org.apache.hadoop.hbase.classification.InterfaceAudience; +031import org.apache.hadoop.conf.Configurable; +032import org.apache.hadoop.conf.Configuration; +033import org.apache.hadoop.hbase.CellScanner; +034import org.apache.hadoop.hbase.DoNotRetryIOException; +035import org.apache.hadoop.hbase.HBaseIOException; +036import org.apache.hadoop.hbase.codec.Codec; +037import org.apache.hadoop.hbase.io.BoundedByteBufferPool; +038import org.apache.hadoop.hbase.io.ByteBufferInputStream; 039import org.apache.hadoop.hbase.io.ByteBufferOutputStream; 040import org.apache.hadoop.hbase.io.HeapSize; 041import org.apache.hadoop.hbase.util.Bytes; @@ -188,136 +188,134 @@ 180 public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, 181 final byte [] cellBlock) 182 throws IOException { -183 return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length); +183 return createCellScanner(codec, compressor, ByteBuffer.wrap(cellBlock)); 184 } 185 186 /** 187 * @param codec -188 * @param cellBlock -189 * @param offset -190 * @param length -191 * @return CellScanner to work against the content of <code>cellBlock</code> -192 * @throws IOException -193 */ -194 public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, -195 final byte [] cellBlock, final int offset, final int length) -196 throws IOException { -197 // If compressed, decompress it first before passing it on else we will leak compression -198 // resources if the stream is not closed properly after we let it out. -199 InputStream is = null; -200 if (compressor != null) { -201 // GZIPCodec fails w/ NPE if no configuration. -202 if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf); -203 Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); -204 CompressionInputStream cis = -205 compressor.createInputStream(new ByteArrayInputStream(cellBlock, offset, length), -206 poolDecompressor); -207 ByteBufferOutputStream bbos = null; -208 try { -209 // TODO: This is ugly. The buffer will be resized on us if we guess wrong. -210 // TODO: Reuse buffers. -211 bbos = new ByteBufferOutputStream((length - offset) * -212 this.cellBlockDecompressionMultiplier); -213 IOUtils.copy(cis, bbos); -214 bbos.close(); -215 ByteBuffer bb = bbos.getByteBuffer(); -216 is = new ByteArrayInputStream(bb.array(), 0, bb.limit()); -217 } finally { -218 if (is != null) is.close(); -219 if (bbos != null) bbos.close(); -220 -221 CodecPool.returnDecompressor(poolDecompressor); -222 } -223 } else { -224 is = new ByteArrayInputStream(cellBlock, offset, length); -225 } -226 return codec.getDecoder(is); -227 } -228 -229 /** -230 * @param m Message to serialize delimited; i.e. w/ a vint of its size preceeding its -231 * serialization. -232 * @return The passed in Message serialized with delimiter. Return null if <code>m</code> is null -233 * @throws IOException -234 */ -235 public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException { -236 if (m == null) return null; -237 int serializedSize = m.getSerializedSize(); -238 int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize); -239 byte [] buffer = new byte[serializedSize + vintSize]; -240 // Passing in a byte array saves COS creating a buffer which it does when using streams. -241 CodedOutputStream cos = CodedOutputStream.newInstance(buffer); -242 // This will write out the vint preamble and the message serialized. -243 cos.writeMessageNoTag(m); -244 cos.flush(); -245 cos.checkNoSpaceLeft(); -246 return ByteBuffer.wrap(buffer); -247 } -248 -249 /** -250 * Write out header, param, and cell block if there is one. -251 * @param dos -252 * @param header -253 * @param param -254 * @param cellBlock -255 * @return Total number of bytes written. -256 * @throws IOException -257 */ -258 public static int write(final OutputStream dos, final Message header, final Message param, -259 final ByteBuffer cellBlock) -260 throws IOException { -261 // Must calculate total size and write that first so other side can read it all in in one -262 // swoop. This is dictated by how the server is currently written. Server needs to change -263 // if we are to be able to write without the length prefixing. -264 int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param); -265 if (cellBlock != null) totalSize += cellBlock.remaining(); -266 return write(dos, header, param, cellBlock, totalSize); -267 } -268 -269 private static int write(final OutputStream dos, final Message header, final Message param, -270 final ByteBuffer cellBlock, final int totalSize) -271 throws IOException { -272 // I confirmed toBytes does same as DataOutputStream#writeInt. -273 dos.write(Bytes.toBytes(totalSize)); -274 // This allocates a buffer that is the size of the message internally. -275 header.writeDelimitedTo(dos); -276 if (param != null) param.writeDelimitedTo(dos); -277 if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining()); -278 dos.flush(); -279 return totalSize; -280 } -281 -282 /** -283 * Read in chunks of 8K (HBASE-7239) -284 * @param in -285 * @param dest -286 * @param offset -287 * @param len -288 * @throws IOException -289 */ -290 public static void readChunked(final DataInput in, byte[] dest, int offset, int len) -291 throws IOException { -292 int maxRead = 8192; -293 -294 for (; offset < len; offset += maxRead) { -295 in.readFully(dest, offset, Math.min(len - offset, maxRead)); -296 } -297 } -298 -299 /** -300 * @return Size on the wire when the two messages are written with writeDelimitedTo -301 */ -302 public static int getTotalSizeWhenWrittenDelimited(Message ... messages) { -303 int totalSize = 0; -304 for (Message m: messages) { -305 if (m == null) continue; -306 totalSize += m.getSerializedSize(); -307 totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize()); -308 } -309 Preconditions.checkArgument(totalSize < Integer.MAX_VALUE); -310 return totalSize; -311 } -312} +188 * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be +189 * position()'ed at the start of the cell block and limit()'ed at the end. +190 * @return CellScanner to work against the content of <code>cellBlock</code> +191 * @throws IOException +192 */ +193 public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, +194 final ByteBuffer cellBlock) +195 throws IOException { +196 // If compressed, decompress it first before passing it on else we will leak compression +197 // resources if the stream is not closed properly after we let it out. +198 InputStream is = null; +199 if (compressor != null) { +200 // GZIPCodec fails w/ NPE if no configuration. +201 if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf); +202 Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); +203 CompressionInputStream cis = +204 compressor.createInputStream(new ByteBufferInputStream(cellBlock), poolDecompressor); +205 ByteBufferOutputStream bbos = null; +206 try { +207 // TODO: This is ugly. The buffer will be resized on us if we guess wrong. +208 // TODO: Reuse buffers. +209 bbos = new ByteBufferOutputStream(cellBlock.remaining() * +210 this.cellBlockDecompressionMultiplier); +211 IOUtils.copy(cis, bbos); +212 bbos.close(); +213 ByteBuffer bb = bbos.getByteBuffer(); +214 is = new ByteBufferInputStream(bb); +215 } finally { +216 if (is != null) is.close(); +217 if (bbos != null) bbos.close(); +218 +219 CodecPool.returnDecompressor(poolDecompressor); +220 } +221 } else { +222 is = new ByteBufferInputStream(cellBlock); +223 } +224 return codec.getDecoder(is); +225 } +226 +227 /** +228 * @param m Message to serialize delimited; i.e. w/ a vint of its size preceeding its +229 * serialization. +230 * @return The passed in Message serialized with delimiter. Return null if <code>m</code> is null +231 * @throws IOException +232 */ +233 public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException { +234 if (m == null) return null; +235 int serializedSize = m.getSerializedSize(); +236 int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize); +237 byte [] buffer = new byte[serializedSize + vintSize]; +238 // Passing in a byte array saves COS creating a buffer which it does when using streams. +239 CodedOutputStream cos = CodedOutputStream.newInstance(buffer); +240 // This will write out the vint preamble and the message serialized. +241 cos.writeMessageNoTag(m); +242 cos.flush(); +243 cos.checkNoSpaceLeft(); +244 return ByteBuffer.wrap(buffer); +245 } +246 +247 /** +248 * Write out header, param, and cell block if there is one. +249 * @param dos +250 * @param header +251 * @param param +252 * @param cellBlock +253 * @return Total number of bytes written. +254 * @throws IOException +255 */ +256 public static int write(final OutputStream dos, final Message header, final Message param, +257 final ByteBuffer cellBlock) +258 throws IOException { +259 // Must calculate total size and write that first so other side can read it all in in one +260 // swoop. This is dictated by how the server is currently written. Server needs to change +261 // if we are to be able to write without the length prefixing. +262 int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param); +263 if (cellBlock != null) totalSize += cellBlock.remaining(); +264 return write(dos, header, param, cellBlock, totalSize); +265 } +266 +267 private static int write(final OutputStream dos, final Message header, final Message param, +268 final ByteBuffer cellBlock, final int totalSize) +269 throws IOException { +270 // I confirmed toBytes does same as DataOutputStream#writeInt. +271 dos.write(Bytes.toBytes(totalSize)); +272 // This allocates a buffer that is the size of the message internally. +273 header.writeDelimitedTo(dos); +274 if (param != null) param.writeDelimitedTo(dos); +275 if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining()); +276 dos.flush(); +277 return totalSize; +278 } +279 +280 /** +281 * Read in chunks of 8K (HBASE-7239) +282 * @param in +283 * @param dest +284 * @param offset +285 * @param len +286 * @throws IOException +287 */ +288 public static void readChunked(final DataInput in, byte[] dest, int offset, int len) +289 throws IOException { +290 int maxRead = 8192; +291 +292 for (; offset < len; offset += maxRead) { +293 in.readFully(dest, offset, Math.min(len - offset, maxRead)); +294 } +295 } +296 +297 /** +298 * @return Size on the wire when the two messages are written with writeDelimitedTo +299 */ +300 public static int getTotalSizeWhenWrittenDelimited(Message ... messages) { +301 int totalSize = 0; +302 for (Message m: messages) { +303 if (m == null) continue; +304 totalSize += m.getSerializedSize(); +305 totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize()); +306 } +307 Preconditions.checkArgument(totalSize < Integer.MAX_VALUE); +308 return totalSize; +309 } +310} http://git-wip-us.apache.org/repos/asf/hbase-site/blob/a8725a46/devapidocs/src-html/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.html b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.html index 6dfcee3..d4aa690 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.html @@ -43,14 +43,14 @@ 035@InterfaceAudience.Private 036public class PayloadCarryingRpcController 037 extends TimeLimitedRpcController implements CellScannable { -038 /** -039 * Priority to set on this request. Set it here in controller so available composing the -040 * request. This is the ordained way of setting priorities going forward. We will be -041 * undoing the old annotation-based mechanism. -042 */ -043 // Currently only multi call makes use of this. Eventually this should be only way to set -044 // priority. -045 private int priority = HConstants.NORMAL_QOS; +038 +039 public static final int PRIORITY_UNSET = -1; +040 /** +041 * Priority to set on this request. Set it here in controller so available composing the +042 * request. This is the ordained way of setting priorities going forward. We will be +043 * undoing the old annotation-based mechanism. +044 */ +045 private int priority = PRIORITY_UNSET; 046 047 /** 048 * They are optionally set on construction, cleared after we make the call, and then optionally @@ -75,43 +75,44 @@ 067 /** 068 * @return One-shot cell scanner (you cannot back it up and restart) 069 */ -070 public CellScanner cellScanner() { -071 return cellScanner; -072 } -073 -074 public void setCellScanner(final CellScanner cellScanner) { -075 this.cellScanner = cellScanner; -076 } -077 -078 /** -079 * @param priority Priority for this request; should fall roughly in the range -080 * {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS} -081 */ -082 public void setPriority(int priority) { -083 this.priority = priority; -084 } -085 -086 /** -087 * @param tn Set priority based off the table we are going against. -088 */ -089 public void setPriority(final TableName tn) { -090 this.priority = -091 (tn != null && tn.isSystemTable())? HConstants.SYSTEMTABLE_QOS: HConstants.NORMAL_QOS; -092 } -093 -094 /** -095 * @return The priority of this request -096 */ -097 public int getPriority() { -098 return priority; -099 } -100 -101 @Override public void reset() { -102 super.reset(); -103 priority = 0; -104 cellScanner = null; -105 } -106} +070 @Override +071 public CellScanner cellScanner() { +072 return cellScanner; +073 } +074 +075 public void setCellScanner(final CellScanner cellScanner) { +076 this.cellScanner = cellScanner; +077 } +078 +079 /** +080 * @param priority Priority for this request; should fall roughly in the range +081 * {@link HConstants#NORMAL_QOS} to {@link HConstants#HIGH_QOS} +082 */ +083 public void setPriority(int priority) { +084 this.priority = priority; +085 } +086 +087 /** +088 * @param tn Set priority based off the table we are going against. +089 */ +090 public void setPriority(final TableName tn) { +091 this.priority = +092 (tn != null && tn.isSystemTable())? HConstants.SYSTEMTABLE_QOS: HConstants.NORMAL_QOS; +093 } +094 +095 /** +096 * @return The priority of this request +097 */ +098 public int getPriority() { +099 return priority; +100 } +101 +102 @Override public void reset() { +103 super.reset(); +104 priority = 0; +105 cellScanner = null; +106 } +107}