Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 57FB6200CF4 for ; Sat, 22 Jul 2017 00:07:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 564BE16E02D; Fri, 21 Jul 2017 22:07:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DC26F16E026 for ; Sat, 22 Jul 2017 00:07:47 +0200 (CEST) Received: (qmail 87930 invoked by uid 500); 21 Jul 2017 22:07:46 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 87641 invoked by uid 99); 21 Jul 2017 22:07:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Jul 2017 22:07:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 703B5F3279; Fri, 21 Jul 2017 22:07:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Fri, 21 Jul 2017 22:07:50 -0000 Message-Id: <260b0711422941b5bd48ae052ebf440a@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [06/51] [partial] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Fri, 21 Jul 2017 22:07:50 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/ca5b0275/devapidocs/src-html/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.Callback.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.Callback.html b/devapidocs/src-html/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.Callback.html index 0c07a2f..c90d203 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.Callback.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.Callback.html @@ -34,553 +34,554 @@ 026import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus; 027import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; 028 -029import com.google.common.annotations.VisibleForTesting; -030 -031import io.netty.buffer.ByteBuf; -032import io.netty.buffer.ByteBufAllocator; -033import io.netty.channel.Channel; -034import io.netty.channel.ChannelHandler.Sharable; -035import io.netty.channel.ChannelHandlerContext; -036import io.netty.channel.EventLoop; -037import io.netty.channel.SimpleChannelInboundHandler; -038import io.netty.handler.codec.protobuf.ProtobufDecoder; -039import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; -040import io.netty.handler.timeout.IdleStateEvent; -041import io.netty.handler.timeout.IdleStateHandler; -042import io.netty.util.concurrent.Future; -043import io.netty.util.concurrent.Promise; -044import io.netty.util.concurrent.PromiseCombiner; -045 -046import java.io.IOException; -047import java.nio.ByteBuffer; -048import java.util.ArrayDeque; -049import java.util.Collection; -050import java.util.Collections; -051import java.util.Deque; -052import java.util.IdentityHashMap; -053import java.util.List; -054import java.util.Set; -055import java.util.concurrent.CompletableFuture; -056import java.util.concurrent.TimeUnit; -057import java.util.function.Supplier; -058 -059import org.apache.hadoop.conf.Configuration; -060import org.apache.hadoop.crypto.Encryptor; -061import org.apache.hadoop.fs.Path; -062import org.apache.hadoop.hbase.classification.InterfaceAudience; -063import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; -064import org.apache.hadoop.hbase.util.CancelableProgressable; -065import org.apache.hadoop.hbase.util.FSUtils; -066import org.apache.hadoop.hdfs.DFSClient; -067import org.apache.hadoop.hdfs.DistributedFileSystem; -068import org.apache.hadoop.hdfs.protocol.ClientProtocol; -069import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -070import org.apache.hadoop.hdfs.protocol.LocatedBlock; -071import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; -072import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; -073import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; -074import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -075import org.apache.hadoop.util.DataChecksum; -076 -077/** -078 * An asynchronous HDFS output stream implementation which fans out data to datanode and only -079 * supports writing file with only one block. -080 * <p> -081 * Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The mainly -082 * usage of this class is implementing WAL, so we only expose a little HDFS configurations in the -083 * method. And we place it here under util package because we want to make it independent of WAL -084 * implementation thus easier to move it to HDFS project finally. -085 * <p> -086 * Note that, all connections to datanode will run in the same {@link EventLoop} which means we only -087 * need one thread here. But be careful, we do some blocking operations in {@link #close()} and -088 * {@link #recoverAndClose(CancelableProgressable)} methods, so do not call them inside -089 * {@link EventLoop}. And for {@link #write(byte[])} {@link #write(byte[], int, int)}, -090 * {@link #buffered()} and {@link #flush(boolean)}, if you call them outside {@link EventLoop}, -091 * there will be an extra context-switch. -092 * <p> -093 * Advantages compare to DFSOutputStream: -094 * <ol> -095 * <li>The fan out mechanism. This will reduce the latency.</li> -096 * <li>The asynchronous WAL could also run in the same EventLoop, we could just call write and flush -097 * inside the EventLoop thread, so generally we only have one thread to do all the things.</li> -098 * <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer -099 * ASAP.</li> -100 * <li>We could benefit from netty's ByteBuf management mechanism.</li> -101 * </ol> -102 */ -103@InterfaceAudience.Private -104public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { -105 -106 // The MAX_PACKET_SIZE is 16MB but it include the header size and checksum size. So here we set a -107 // smaller limit for data size. -108 private static final int MAX_DATA_LEN = 12 * 1024 * 1024; -109 -110 private final Configuration conf; -111 -112 private final FSUtils fsUtils; -113 -114 private final DistributedFileSystem dfs; -115 -116 private final DFSClient client; -117 -118 private final ClientProtocol namenode; -119 -120 private final String clientName; -121 -122 private final String src; -123 -124 private final long fileId; -125 -126 private final LocatedBlock locatedBlock; -127 -128 private final Encryptor encryptor; -129 -130 private final EventLoop eventLoop; -131 -132 private final List<Channel> datanodeList; -133 -134 private final DataChecksum summer; -135 -136 private final int maxDataLen; -137 -138 private final ByteBufAllocator alloc; -139 -140 private static final class Callback { -141 -142 private final Promise<Void> promise; -143 -144 private final long ackedLength; -145 -146 private final Set<Channel> unfinishedReplicas; -147 -148 public Callback(Promise<Void> promise, long ackedLength, Collection<Channel> replicas) { -149 this.promise = promise; -150 this.ackedLength = ackedLength; -151 if (replicas.isEmpty()) { -152 this.unfinishedReplicas = Collections.emptySet(); -153 } else { -154 this.unfinishedReplicas = -155 Collections.newSetFromMap(new IdentityHashMap<Channel, Boolean>(replicas.size())); -156 this.unfinishedReplicas.addAll(replicas); -157 } -158 } -159 } -160 -161 private final Deque<Callback> waitingAckQueue = new ArrayDeque<>(); -162 -163 // this could be different from acked block length because a packet can not start at the middle of -164 // a chunk. -165 private long nextPacketOffsetInBlock = 0L; -166 -167 private long nextPacketSeqno = 0L; -168 -169 private ByteBuf buf; -170 // buf's initial capacity - 4KB -171 private int capacity = 4 * 1024; -172 -173 // LIMIT is 128MB -174 private static final int LIMIT = 128 * 1024 * 1024; -175 -176 private enum State { -177 STREAMING, CLOSING, BROKEN, CLOSED -178 } -179 -180 private State state; -181 -182 private void completed(Channel channel) { -183 if (waitingAckQueue.isEmpty()) { -184 return; -185 } -186 for (Callback c : waitingAckQueue) { -187 if (c.unfinishedReplicas.remove(channel)) { -188 if (c.unfinishedReplicas.isEmpty()) { -189 c.promise.trySuccess(null); -190 // since we will remove the Callback entry from waitingAckQueue if its unfinishedReplicas -191 // is empty, so this could only happen at the head of waitingAckQueue, so we just call -192 // removeFirst here. -193 waitingAckQueue.removeFirst(); -194 // also wake up flush requests which have the same length. -195 for (Callback cb; (cb = waitingAckQueue.peekFirst()) != null;) { -196 if (cb.ackedLength == c.ackedLength) { -197 cb.promise.trySuccess(null); -198 waitingAckQueue.removeFirst(); -199 } else { -200 break; -201 } -202 } -203 } -204 return; -205 } -206 } -207 } -208 -209 private void failed(Channel channel, Supplier<Throwable> errorSupplier) { -210 if (state == State.BROKEN || state == State.CLOSED) { -211 return; -212 } -213 if (state == State.CLOSING) { -214 Callback c = waitingAckQueue.peekFirst(); -215 if (c == null || !c.unfinishedReplicas.contains(channel)) { -216 // nothing, the endBlock request has already finished. -217 return; -218 } -219 } -220 // disable further write, and fail all pending ack. -221 state = State.BROKEN; -222 Throwable error = errorSupplier.get(); -223 waitingAckQueue.stream().forEach(c -> c.promise.tryFailure(error)); -224 waitingAckQueue.clear(); -225 datanodeList.forEach(ch -> ch.close()); -226 } -227 -228 @Sharable -229 private final class AckHandler extends SimpleChannelInboundHandler<PipelineAckProto> { -230 -231 private final int timeoutMs; -232 -233 public AckHandler(int timeoutMs) { -234 this.timeoutMs = timeoutMs; -235 } -236 -237 @Override -238 protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception { -239 Status reply = getStatus(ack); -240 if (reply != Status.SUCCESS) { -241 failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " -242 + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress())); -243 return; -244 } -245 if (PipelineAck.isRestartOOBStatus(reply)) { -246 failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block " -247 + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress())); -248 return; -249 } -250 if (ack.getSeqno() == HEART_BEAT_SEQNO) { -251 return; -252 } -253 completed(ctx.channel()); -254 } -255 -256 @Override -257 public void channelInactive(ChannelHandlerContext ctx) throws Exception { -258 failed(ctx.channel(), -259 () -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed")); -260 } -261 -262 @Override -263 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { -264 failed(ctx.channel(), () -> cause); -265 } -266 -267 @Override -268 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { -269 if (evt instanceof IdleStateEvent) { -270 IdleStateEvent e = (IdleStateEvent) evt; -271 if (e.state() == READER_IDLE) { -272 failed(ctx.channel(), -273 () -> new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); -274 } else if (e.state() == WRITER_IDLE) { -275 PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false); -276 int len = heartbeat.getSerializedSize(); -277 ByteBuf buf = alloc.buffer(len); -278 heartbeat.putInBuffer(buf.nioBuffer(0, len)); -279 buf.writerIndex(len); -280 ctx.channel().writeAndFlush(buf); -281 } -282 return; -283 } -284 super.userEventTriggered(ctx, evt); -285 } -286 } -287 -288 private void setupReceiver(int timeoutMs) { -289 AckHandler ackHandler = new AckHandler(timeoutMs); -290 for (Channel ch : datanodeList) { -291 ch.pipeline().addLast( -292 new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS), -293 new ProtobufVarint32FrameDecoder(), -294 new ProtobufDecoder(PipelineAckProto.getDefaultInstance()), ackHandler); -295 ch.config().setAutoRead(true); -296 } -297 } -298 -299 FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, DistributedFileSystem dfs, -300 DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId, -301 LocatedBlock locatedBlock, Encryptor encryptor, EventLoop eventLoop, -302 List<Channel> datanodeList, DataChecksum summer, ByteBufAllocator alloc) { -303 this.conf = conf; -304 this.fsUtils = fsUtils; -305 this.dfs = dfs; -306 this.client = client; -307 this.namenode = namenode; -308 this.fileId = fileId; -309 this.clientName = clientName; -310 this.src = src; -311 this.locatedBlock = locatedBlock; -312 this.encryptor = encryptor; -313 this.eventLoop = eventLoop; -314 this.datanodeList = datanodeList; -315 this.summer = summer; -316 this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum()); -317 this.alloc = alloc; -318 this.buf = alloc.directBuffer(capacity); -319 this.state = State.STREAMING; -320 setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); -321 } -322 -323 private void writeInt0(int i) { -324 buf.ensureWritable(4); -325 buf.writeInt(i); -326 } -327 -328 @Override -329 public void writeInt(int i) { -330 if (eventLoop.inEventLoop()) { -331 writeInt0(i); -332 } else { -333 eventLoop.submit(() -> writeInt0(i)); -334 } -335 } -336 -337 private void write0(ByteBuffer bb) { -338 buf.ensureWritable(bb.remaining()); -339 buf.writeBytes(bb); -340 } -341 -342 @Override -343 public void write(ByteBuffer bb) { -344 if (eventLoop.inEventLoop()) { -345 write0(bb); -346 } else { -347 eventLoop.submit(() -> write0(bb)); -348 } -349 } -350 -351 @Override -352 public void write(byte[] b) { -353 write(b, 0, b.length); -354 } -355 -356 private void write0(byte[] b, int off, int len) { -357 buf.ensureWritable(len); -358 buf.writeBytes(b, off, len); -359 } -360 -361 @Override -362 public void write(byte[] b, int off, int len) { -363 if (eventLoop.inEventLoop()) { -364 write0(b, off, len); -365 } else { -366 eventLoop.submit(() -> write0(b, off, len)).syncUninterruptibly(); -367 } -368 } -369 -370 @Override -371 public int buffered() { -372 if (eventLoop.inEventLoop()) { -373 return buf.readableBytes(); -374 } else { -375 return eventLoop.submit(() -> buf.readableBytes()).syncUninterruptibly().getNow().intValue(); -376 } -377 } -378 -379 @Override -380 public DatanodeInfo[] getPipeline() { -381 return locatedBlock.getLocations(); -382 } -383 -384 private Promise<Void> flushBuffer(ByteBuf dataBuf, long nextPacketOffsetInBlock, -385 boolean syncBlock) { -386 int dataLen = dataBuf.readableBytes(); -387 int chunkLen = summer.getBytesPerChecksum(); -388 int trailingPartialChunkLen = dataLen % chunkLen; -389 int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0); -390 int checksumLen = numChecks * summer.getChecksumSize(); -391 ByteBuf checksumBuf = alloc.directBuffer(checksumLen); -392 summer.calculateChunkedSums(dataBuf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen)); -393 checksumBuf.writerIndex(checksumLen); -394 PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock, -395 nextPacketSeqno, false, dataLen, syncBlock); -396 int headerLen = header.getSerializedSize(); -397 ByteBuf headerBuf = alloc.buffer(headerLen); -398 header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); -399 headerBuf.writerIndex(headerLen); -400 -401 long ackedLength = nextPacketOffsetInBlock + dataLen; -402 Promise<Void> promise = eventLoop.<Void> newPromise().addListener(future -> { -403 if (future.isSuccess()) { -404 locatedBlock.getBlock().setNumBytes(ackedLength); -405 } -406 }); -407 waitingAckQueue.addLast(new Callback(promise, ackedLength, datanodeList)); -408 for (Channel ch : datanodeList) { -409 ch.write(headerBuf.duplicate().retain()); -410 ch.write(checksumBuf.duplicate().retain()); -411 ch.writeAndFlush(dataBuf.duplicate().retain()); -412 } -413 checksumBuf.release(); -414 headerBuf.release(); -415 dataBuf.release(); -416 nextPacketSeqno++; -417 return promise; -418 } -419 -420 private void flush0(CompletableFuture<Long> future, boolean syncBlock) { -421 if (state != State.STREAMING) { -422 future.completeExceptionally(new IOException("stream already broken")); -423 return; -424 } -425 int dataLen = buf.readableBytes(); -426 if (encryptor != null) { -427 ByteBuf encryptBuf = alloc.directBuffer(dataLen); -428 try { -429 encryptor.encrypt(buf.nioBuffer(buf.readerIndex(), dataLen), -430 encryptBuf.nioBuffer(0, dataLen)); -431 } catch (IOException e) { -432 encryptBuf.release(); -433 future.completeExceptionally(e); -434 return; -435 } -436 encryptBuf.writerIndex(dataLen); -437 buf.release(); -438 buf = encryptBuf; -439 } -440 long lengthAfterFlush = nextPacketOffsetInBlock + dataLen; -441 if (lengthAfterFlush == locatedBlock.getBlock().getNumBytes()) { -442 // no new data, just return -443 future.complete(locatedBlock.getBlock().getNumBytes()); -444 return; -445 } -446 Callback c = waitingAckQueue.peekLast(); -447 if (c != null && lengthAfterFlush == c.ackedLength) { -448 // just append it to the tail of waiting ack queue,, do not issue new hflush request. -449 waitingAckQueue.addLast(new Callback(eventLoop.<Void> newPromise().addListener(f -> { -450 if (f.isSuccess()) { -451 future.complete(lengthAfterFlush); -452 } else { -453 future.completeExceptionally(f.cause()); -454 } -455 }), lengthAfterFlush, Collections.<Channel> emptyList())); -456 return; -457 } -458 Promise<Void> promise; -459 if (dataLen > maxDataLen) { -460 // We need to write out the data by multiple packets as the max packet allowed is 16M. -461 PromiseCombiner combiner = new PromiseCombiner(); -462 long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock; -463 for (int remaining = dataLen; remaining > 0;) { -464 int toWriteDataLen = Math.min(remaining, maxDataLen); -465 combiner.add((Future<Void>) flushBuffer(buf.readRetainedSlice(toWriteDataLen), -466 nextSubPacketOffsetInBlock, syncBlock)); -467 nextSubPacketOffsetInBlock += toWriteDataLen; -468 remaining -= toWriteDataLen; -469 } -470 promise = eventLoop.newPromise(); -471 combiner.finish(promise); -472 } else { -473 promise = flushBuffer(buf.retain(), nextPacketOffsetInBlock, syncBlock); -474 } -475 promise.addListener(f -> { -476 if (f.isSuccess()) { -477 future.complete(lengthAfterFlush); -478 } else { -479 future.completeExceptionally(f.cause()); -480 } -481 }); -482 int trailingPartialChunkLen = dataLen % summer.getBytesPerChecksum(); -483 ByteBuf newBuf = alloc.directBuffer(guess(dataLen)).ensureWritable(trailingPartialChunkLen); -484 if (trailingPartialChunkLen != 0) { -485 buf.readerIndex(dataLen - trailingPartialChunkLen).readBytes(newBuf, trailingPartialChunkLen); -486 } -487 buf.release(); -488 this.buf = newBuf; -489 nextPacketOffsetInBlock += dataLen - trailingPartialChunkLen; -490 } -491 -492 /** -493 * Flush the buffer out to datanodes. -494 * @param syncBlock will call hsync if true, otherwise hflush. -495 * @return A CompletableFuture that hold the acked length after flushing. -496 */ -497 public CompletableFuture<Long> flush(boolean syncBlock) { -498 CompletableFuture<Long> future = new CompletableFuture<>(); -499 if (eventLoop.inEventLoop()) { -500 flush0(future, syncBlock); -501 } else { -502 eventLoop.execute(() -> flush0(future, syncBlock)); -503 } -504 return future; -505 } -506 -507 private void endBlock(Promise<Void> promise, long size) { -508 if (state != State.STREAMING) { -509 promise.tryFailure(new IOException("stream already broken")); -510 return; -511 } -512 if (!waitingAckQueue.isEmpty()) { -513 promise.tryFailure(new IllegalStateException("should call flush first before calling close")); -514 return; -515 } -516 state = State.CLOSING; -517 PacketHeader header = new PacketHeader(4, size, nextPacketSeqno, true, 0, false); -518 buf.release(); -519 buf = null; -520 int headerLen = header.getSerializedSize(); -521 ByteBuf headerBuf = alloc.directBuffer(headerLen); -522 header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); -523 headerBuf.writerIndex(headerLen); -524 waitingAckQueue.add(new Callback(promise, size, datanodeList)); -525 datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.duplicate().retain())); -526 headerBuf.release(); -527 } -528 -529 /** -530 * The close method when error occurred. Now we just call recoverFileLease. -531 */ -532 @Override -533 public void recoverAndClose(CancelableProgressable reporter) throws IOException { -534 assert !eventLoop.inEventLoop(); -535 datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly()); -536 endFileLease(client, fileId); -537 fsUtils.recoverFileLease(dfs, new Path(src), conf, -538 reporter == null ? new CancelOnClose(client) : reporter); -539 } -540 -541 /** -542 * End the current block and complete file at namenode. You should call -543 * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception. -544 */ -545 @Override -546 public void close() throws IOException { -547 assert !eventLoop.inEventLoop(); -548 Promise<Void> promise = eventLoop.newPromise(); -549 eventLoop.execute(() -> endBlock(promise, nextPacketOffsetInBlock + buf.readableBytes())); -550 promise.addListener(f -> datanodeList.forEach(ch -> ch.close())).syncUninterruptibly(); -551 datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly()); -552 completeFile(client, namenode, src, clientName, locatedBlock.getBlock(), fileId); -553 } -554 -555 @VisibleForTesting -556 int guess(int bytesWritten) { -557 // if the bytesWritten is greater than the current capacity -558 // always increase the capacity in powers of 2. -559 if (bytesWritten > this.capacity) { -560 // Ensure we don't cross the LIMIT -561 if ((this.capacity << 1) <= LIMIT) { -562 // increase the capacity in the range of power of 2 -563 this.capacity = this.capacity << 1; -564 } -565 } else { -566 // if we see that the bytesWritten is lesser we could again decrease -567 // the capacity by dividing it by 2 if the bytesWritten is satisfied by -568 // that reduction -569 if ((this.capacity >> 1) >= bytesWritten) { -570 this.capacity = this.capacity >> 1; -571 } -572 } -573 return this.capacity; -574 } -575} +029 +030import io.netty.buffer.ByteBuf; +031import io.netty.buffer.ByteBufAllocator; +032import io.netty.channel.Channel; +033import io.netty.channel.ChannelHandler.Sharable; +034import io.netty.channel.ChannelHandlerContext; +035import io.netty.channel.EventLoop; +036import io.netty.channel.SimpleChannelInboundHandler; +037import io.netty.handler.codec.protobuf.ProtobufDecoder; +038import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +039import io.netty.handler.timeout.IdleStateEvent; +040import io.netty.handler.timeout.IdleStateHandler; +041import io.netty.util.concurrent.Future; +042import io.netty.util.concurrent.Promise; +043import io.netty.util.concurrent.PromiseCombiner; +044 +045import java.io.IOException; +046import java.nio.ByteBuffer; +047import java.util.ArrayDeque; +048import java.util.Collection; +049import java.util.Collections; +050import java.util.Deque; +051import java.util.IdentityHashMap; +052import java.util.List; +053import java.util.Set; +054import java.util.concurrent.CompletableFuture; +055import java.util.concurrent.TimeUnit; +056import java.util.function.Supplier; +057 +058import org.apache.hadoop.conf.Configuration; +059import org.apache.hadoop.crypto.Encryptor; +060import org.apache.hadoop.fs.Path; +061import org.apache.hadoop.hbase.classification.InterfaceAudience; +062import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; +063import org.apache.hadoop.hbase.util.CancelableProgressable; +064import org.apache.hadoop.hbase.util.FSUtils; +065import org.apache.hadoop.hdfs.DFSClient; +066import org.apache.hadoop.hdfs.DistributedFileSystem; +067import org.apache.hadoop.hdfs.protocol.ClientProtocol; +068import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +069import org.apache.hadoop.hdfs.protocol.LocatedBlock; +070import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +071import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; +072import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; +073import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +074import org.apache.hadoop.util.DataChecksum; +075 +076import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +077 +078/** +079 * An asynchronous HDFS output stream implementation which fans out data to datanode and only +080 * supports writing file with only one block. +081 * <p> +082 * Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The mainly +083 * usage of this class is implementing WAL, so we only expose a little HDFS configurations in the +084 * method. And we place it here under util package because we want to make it independent of WAL +085 * implementation thus easier to move it to HDFS project finally. +086 * <p> +087 * Note that, all connections to datanode will run in the same {@link EventLoop} which means we only +088 * need one thread here. But be careful, we do some blocking operations in {@link #close()} and +089 * {@link #recoverAndClose(CancelableProgressable)} methods, so do not call them inside +090 * {@link EventLoop}. And for {@link #write(byte[])} {@link #write(byte[], int, int)}, +091 * {@link #buffered()} and {@link #flush(boolean)}, if you call them outside {@link EventLoop}, +092 * there will be an extra context-switch. +093 * <p> +094 * Advantages compare to DFSOutputStream: +095 * <ol> +096 * <li>The fan out mechanism. This will reduce the latency.</li> +097 * <li>The asynchronous WAL could also run in the same EventLoop, we could just call write and flush +098 * inside the EventLoop thread, so generally we only have one thread to do all the things.</li> +099 * <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer +100 * ASAP.</li> +101 * <li>We could benefit from netty's ByteBuf management mechanism.</li> +102 * </ol> +103 */ +104@InterfaceAudience.Private +105public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { +106 +107 // The MAX_PACKET_SIZE is 16MB but it include the header size and checksum size. So here we set a +108 // smaller limit for data size. +109 private static final int MAX_DATA_LEN = 12 * 1024 * 1024; +110 +111 private final Configuration conf; +112 +113 private final FSUtils fsUtils; +114 +115 private final DistributedFileSystem dfs; +116 +117 private final DFSClient client; +118 +119 private final ClientProtocol namenode; +120 +121 private final String clientName; +122 +123 private final String src; +124 +125 private final long fileId; +126 +127 private final LocatedBlock locatedBlock; +128 +129 private final Encryptor encryptor; +130 +131 private final EventLoop eventLoop; +132 +133 private final List<Channel> datanodeList; +134 +135 private final DataChecksum summer; +136 +137 private final int maxDataLen; +138 +139 private final ByteBufAllocator alloc; +140 +141 private static final class Callback { +142 +143 private final Promise<Void> promise; +144 +145 private final long ackedLength; +146 +147 private final Set<Channel> unfinishedReplicas; +148 +149 public Callback(Promise<Void> promise, long ackedLength, Collection<Channel> replicas) { +150 this.promise = promise; +151 this.ackedLength = ackedLength; +152 if (replicas.isEmpty()) { +153 this.unfinishedReplicas = Collections.emptySet(); +154 } else { +155 this.unfinishedReplicas = +156 Collections.newSetFromMap(new IdentityHashMap<Channel, Boolean>(replicas.size())); +157 this.unfinishedReplicas.addAll(replicas); +158 } +159 } +160 } +161 +162 private final Deque<Callback> waitingAckQueue = new ArrayDeque<>(); +163 +164 // this could be different from acked block length because a packet can not start at the middle of +165 // a chunk. +166 private long nextPacketOffsetInBlock = 0L; +167 +168 private long nextPacketSeqno = 0L; +169 +170 private ByteBuf buf; +171 // buf's initial capacity - 4KB +172 private int capacity = 4 * 1024; +173 +174 // LIMIT is 128MB +175 private static final int LIMIT = 128 * 1024 * 1024; +176 +177 private enum State { +178 STREAMING, CLOSING, BROKEN, CLOSED +179 } +180 +181 private State state; +182 +183 private void completed(Channel channel) { +184 if (waitingAckQueue.isEmpty()) { +185 return; +186 } +187 for (Callback c : waitingAckQueue) { +188 if (c.unfinishedReplicas.remove(channel)) { +189 if (c.unfinishedReplicas.isEmpty()) { +190 c.promise.trySuccess(null); +191 // since we will remove the Callback entry from waitingAckQueue if its unfinishedReplicas +192 // is empty, so this could only happen at the head of waitingAckQueue, so we just call +193 // removeFirst here. +194 waitingAckQueue.removeFirst(); +195 // also wake up flush requests which have the same length. +196 for (Callback cb; (cb = waitingAckQueue.peekFirst()) != null;) { +197 if (cb.ackedLength == c.ackedLength) { +198 cb.promise.trySuccess(null); +199 waitingAckQueue.removeFirst(); +200 } else { +201 break; +202 } +203 } +204 } +205 return; +206 } +207 } +208 } +209 +210 private void failed(Channel channel, Supplier<Throwable> errorSupplier) { +211 if (state == State.BROKEN || state == State.CLOSED) { +212 return; +213 } +214 if (state == State.CLOSING) { +215 Callback c = waitingAckQueue.peekFirst(); +216 if (c == null || !c.unfinishedReplicas.contains(channel)) { +217 // nothing, the endBlock request has already finished. +218 return; +219 } +220 } +221 // disable further write, and fail all pending ack. +222 state = State.BROKEN; +223 Throwable error = errorSupplier.get(); +224 waitingAckQueue.stream().forEach(c -> c.promise.tryFailure(error)); +225 waitingAckQueue.clear(); +226 datanodeList.forEach(ch -> ch.close()); +227 } +228 +229 @Sharable +230 private final class AckHandler extends SimpleChannelInboundHandler<PipelineAckProto> { +231 +232 private final int timeoutMs; +233 +234 public AckHandler(int timeoutMs) { +235 this.timeoutMs = timeoutMs; +236 } +237 +238 @Override +239 protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception { +240 Status reply = getStatus(ack); +241 if (reply != Status.SUCCESS) { +242 failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " +243 + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress())); +244 return; +245 } +246 if (PipelineAck.isRestartOOBStatus(reply)) { +247 failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block " +248 + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress())); +249 return; +250 } +251 if (ack.getSeqno() == HEART_BEAT_SEQNO) { +252 return; +253 } +254 completed(ctx.channel()); +255 } +256 +257 @Override +258 public void channelInactive(ChannelHandlerContext ctx) throws Exception { +259 failed(ctx.channel(), +260 () -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed")); +261 } +262 +263 @Override +264 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +265 failed(ctx.channel(), () -> cause); +266 } +267 +268 @Override +269 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { +270 if (evt instanceof IdleStateEvent) { +271 IdleStateEvent e = (IdleStateEvent) evt; +272 if (e.state() == READER_IDLE) { +273 failed(ctx.channel(), +274 () -> new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); +275 } else if (e.state() == WRITER_IDLE) { +276 PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false); +277 int len = heartbeat.getSerializedSize(); +278 ByteBuf buf = alloc.buffer(len); +279 heartbeat.putInBuffer(buf.nioBuffer(0, len)); +280 buf.writerIndex(len); +281 ctx.channel().writeAndFlush(buf); +282 } +283 return; +284 } +285 super.userEventTriggered(ctx, evt); +286 } +287 } +288 +289 private void setupReceiver(int timeoutMs) { +290 AckHandler ackHandler = new AckHandler(timeoutMs); +291 for (Channel ch : datanodeList) { +292 ch.pipeline().addLast( +293 new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS), +294 new ProtobufVarint32FrameDecoder(), +295 new ProtobufDecoder(PipelineAckProto.getDefaultInstance()), ackHandler); +296 ch.config().setAutoRead(true); +297 } +298 } +299 +300 FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, DistributedFileSystem dfs, +301 DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId, +302 LocatedBlock locatedBlock, Encryptor encryptor, EventLoop eventLoop, +303 List<Channel> datanodeList, DataChecksum summer, ByteBufAllocator alloc) { +304 this.conf = conf; +305 this.fsUtils = fsUtils; +306 this.dfs = dfs; +307 this.client = client; +308 this.namenode = namenode; +309 this.fileId = fileId; +310 this.clientName = clientName; +311 this.src = src; +312 this.locatedBlock = locatedBlock; +313 this.encryptor = encryptor; +314 this.eventLoop = eventLoop; +315 this.datanodeList = datanodeList; +316 this.summer = summer; +317 this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum()); +318 this.alloc = alloc; +319 this.buf = alloc.directBuffer(capacity); +320 this.state = State.STREAMING; +321 setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); +322 } +323 +324 private void writeInt0(int i) { +325 buf.ensureWritable(4); +326 buf.writeInt(i); +327 } +328 +329 @Override +330 public void writeInt(int i) { +331 if (eventLoop.inEventLoop()) { +332 writeInt0(i); +333 } else { +334 eventLoop.submit(() -> writeInt0(i)); +335 } +336 } +337 +338 private void write0(ByteBuffer bb) { +339 buf.ensureWritable(bb.remaining()); +340 buf.writeBytes(bb); +341 } +342 +343 @Override +344 public void write(ByteBuffer bb) { +345 if (eventLoop.inEventLoop()) { +346 write0(bb); +347 } else { +348 eventLoop.submit(() -> write0(bb)); +349 } +350 } +351 +352 @Override +353 public void write(byte[] b) { +354 write(b, 0, b.length); +355 } +356 +357 private void write0(byte[] b, int off, int len) { +358 buf.ensureWritable(len); +359 buf.writeBytes(b, off, len); +360 } +361 +362 @Override +363 public void write(byte[] b, int off, int len) { +364 if (eventLoop.inEventLoop()) { +365 write0(b, off, len); +366 } else { +367 eventLoop.submit(() -> write0(b, off, len)).syncUninterruptibly(); +368 } +369 } +370 +371 @Override +372 public int buffered() { +373 if (eventLoop.inEventLoop()) { +374 return buf.readableBytes(); +375 } else { +376 return eventLoop.submit(() -> buf.readableBytes()).syncUninterruptibly().getNow().intValue(); +377 } +378 } +379 +380 @Override +381 public DatanodeInfo[] getPipeline() { +382 return locatedBlock.getLocations(); +383 } +384 +385 private Promise<Void> flushBuffer(ByteBuf dataBuf, long nextPacketOffsetInBlock, +386 boolean syncBlock) { +387 int dataLen = dataBuf.readableBytes(); +388 int chunkLen = summer.getBytesPerChecksum(); +389 int trailingPartialChunkLen = dataLen % chunkLen; +390 int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0); +391 int checksumLen = numChecks * summer.getChecksumSize(); +392 ByteBuf checksumBuf = alloc.directBuffer(checksumLen); +393 summer.calculateChunkedSums(dataBuf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen)); +394