Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2FE32200CB5 for ; Wed, 12 Jul 2017 17:02:35 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2E25C1690E8; Wed, 12 Jul 2017 15:02:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 59F8416907B for ; Wed, 12 Jul 2017 17:02:33 +0200 (CEST) Received: (qmail 38844 invoked by uid 500); 12 Jul 2017 15:02:28 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 36428 invoked by uid 99); 12 Jul 2017 15:02:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Jul 2017 15:02:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2CD1FF5549; Wed, 12 Jul 2017 15:02:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Wed, 12 Jul 2017 15:03:01 -0000 Message-Id: <8ba6cf7e41b44f0284e626393410b23f@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [40/51] [partial] hbase-site git commit: Published site at 82d554e3783372cc6b05489452c815b57c06f6cd. archived-at: Wed, 12 Jul 2017 15:02:35 -0000 http://git-wip-us.apache.org/repos/asf/hbase-site/blob/2777c693/devapidocs/src-html/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.BlockAdder.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.BlockAdder.html b/devapidocs/src-html/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.BlockAdder.html index 0a32350..cf44d69 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.BlockAdder.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.BlockAdder.html @@ -75,735 +75,796 @@ 067import org.apache.hadoop.conf.Configuration; 068import org.apache.hadoop.crypto.CryptoProtocolVersion; 069import org.apache.hadoop.crypto.Encryptor; -070import org.apache.hadoop.fs.FileSystem; -071import org.apache.hadoop.fs.FileSystemLinkResolver; -072import org.apache.hadoop.fs.Path; -073import org.apache.hadoop.fs.UnresolvedLinkException; -074import org.apache.hadoop.fs.permission.FsPermission; -075import org.apache.hadoop.hbase.classification.InterfaceAudience; -076import org.apache.hadoop.hbase.client.ConnectionUtils; -077import org.apache.hadoop.hbase.util.CancelableProgressable; -078import org.apache.hadoop.hbase.util.FSUtils; -079import org.apache.hadoop.hdfs.DFSClient; -080import org.apache.hadoop.hdfs.DFSOutputStream; -081import org.apache.hadoop.hdfs.DistributedFileSystem; -082import org.apache.hadoop.hdfs.protocol.ClientProtocol; -083import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -084import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -085import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -086import org.apache.hadoop.hdfs.protocol.LocatedBlock; -087import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; -088import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; -089import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; -090import org.apache.hadoop.hdfs.protocol.datatransfer.Op; -091import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; -092import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; -093import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -094import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; -095import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; -096import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; -097import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; -098import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; -099import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -100import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; -101import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; -102import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -103import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; -104import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; -105import org.apache.hadoop.io.EnumSetWritable; -106import org.apache.hadoop.ipc.RemoteException; -107import org.apache.hadoop.net.NetUtils; -108import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; -109import org.apache.hadoop.security.token.Token; -110import org.apache.hadoop.util.DataChecksum; -111 -112/** -113 * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}. -114 */ -115@InterfaceAudience.Private -116public final class FanOutOneBlockAsyncDFSOutputHelper { -117 -118 private static final Log LOG = LogFactory.getLog(FanOutOneBlockAsyncDFSOutputHelper.class); -119 -120 private FanOutOneBlockAsyncDFSOutputHelper() { -121 } -122 -123 // use pooled allocator for performance. -124 private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT; -125 -126 // copied from DFSPacket since it is package private. -127 public static final long HEART_BEAT_SEQNO = -1L; -128 -129 // Timeouts for communicating with DataNode for streaming writes/reads -130 public static final int READ_TIMEOUT = 60 * 1000; -131 public static final int READ_TIMEOUT_EXTENSION = 5 * 1000; -132 public static final int WRITE_TIMEOUT = 8 * 60 * 1000; -133 -134 // helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a -135 // getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may -136 // get from proto directly, or combined by the reply field of the proto and a ECN object. See -137 // createPipelineAckStatusGetter for more details. -138 private interface PipelineAckStatusGetter { -139 Status get(PipelineAckProto ack); -140 } -141 -142 private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER; -143 -144 // StorageType enum is placed under o.a.h.hdfs in hadoop 2.6 and o.a.h.fs in hadoop 2.7. So here -145 // we need to use reflection to set it.See createStorageTypeSetter for more details. -146 private interface StorageTypeSetter { -147 OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType); -148 } -149 -150 private static final StorageTypeSetter STORAGE_TYPE_SETTER; -151 -152 // helper class for calling add block method on namenode. There is a addBlockFlags parameter for -153 // hadoop 2.8 or later. See createBlockAdder for more details. -154 private interface BlockAdder { -155 -156 LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName, -157 ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes) -158 throws IOException; -159 } -160 -161 private static final BlockAdder BLOCK_ADDER; -162 -163 private interface LeaseManager { -164 -165 void begin(DFSClient client, long inodeId); -166 -167 void end(DFSClient client, long inodeId); -168 } -169 -170 private static final LeaseManager LEASE_MANAGER; -171 -172 // This is used to terminate a recoverFileLease call when FileSystem is already closed. -173 // isClientRunning is not public so we need to use reflection. -174 private interface DFSClientAdaptor { -175 -176 boolean isClientRunning(DFSClient client); -177 } -178 -179 private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR; -180 -181 // helper class for convert protos. -182 private interface PBHelper { -183 -184 ExtendedBlockProto convert(ExtendedBlock b); -185 -186 TokenProto convert(Token<?> tok); -187 } -188 -189 private static final PBHelper PB_HELPER; -190 -191 // helper class for creating data checksum. -192 private interface ChecksumCreater { -193 DataChecksum createChecksum(Object conf); -194 } -195 -196 private static final ChecksumCreater CHECKSUM_CREATER; -197 -198 private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException { -199 Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning"); -200 isClientRunningMethod.setAccessible(true); -201 return new DFSClientAdaptor() { -202 -203 @Override -204 public boolean isClientRunning(DFSClient client) { -205 try { -206 return (Boolean) isClientRunningMethod.invoke(client); -207 } catch (IllegalAccessException | InvocationTargetException e) { -208 throw new RuntimeException(e); -209 } -210 } -211 }; -212 } -213 -214 private static LeaseManager createLeaseManager() throws NoSuchMethodException { -215 Method beginFileLeaseMethod = -216 DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class); -217 beginFileLeaseMethod.setAccessible(true); -218 Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class); -219 endFileLeaseMethod.setAccessible(true); -220 return new LeaseManager() { -221 -222 @Override -223 public void begin(DFSClient client, long inodeId) { -224 try { -225 beginFileLeaseMethod.invoke(client, inodeId, null); -226 } catch (IllegalAccessException | InvocationTargetException e) { -227 throw new RuntimeException(e); -228 } -229 } -230 -231 @Override -232 public void end(DFSClient client, long inodeId) { -233 try { -234 endFileLeaseMethod.invoke(client, inodeId); -235 } catch (IllegalAccessException | InvocationTargetException e) { -236 throw new RuntimeException(e); -237 } -238 } -239 }; -240 } -241 -242 private static PipelineAckStatusGetter createPipelineAckStatusGetter27() -243 throws NoSuchMethodException { -244 Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList"); -245 @SuppressWarnings("rawtypes") -246 Class<? extends Enum> ecnClass; -247 try { -248 ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN") -249 .asSubclass(Enum.class); -250 } catch (ClassNotFoundException e) { -251 String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please " -252 + "update your WAL Provider to not make use of the 'asyncfs' provider. See " -253 + "HBASE-16110 for more information."; -254 LOG.error(msg, e); -255 throw new Error(msg, e); -256 } -257 @SuppressWarnings("unchecked") -258 Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED"); -259 Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class); -260 Method combineHeaderMethod = -261 PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class); -262 Method getStatusFromHeaderMethod = -263 PipelineAck.class.getMethod("getStatusFromHeader", int.class); -264 return new PipelineAckStatusGetter() { -265 -266 @Override -267 public Status get(PipelineAckProto ack) { -268 try { -269 @SuppressWarnings("unchecked") -270 List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack); -271 Integer headerFlag; -272 if (flagList.isEmpty()) { -273 Status reply = (Status) getReplyMethod.invoke(ack, 0); -274 headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply); -275 } else { -276 headerFlag = flagList.get(0); -277 } -278 return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag); -279 } catch (IllegalAccessException | InvocationTargetException e) { -280 throw new RuntimeException(e); -281 } -282 } -283 }; -284 } -285 -286 private static PipelineAckStatusGetter createPipelineAckStatusGetter26() -287 throws NoSuchMethodException { -288 Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class); -289 return new PipelineAckStatusGetter() { -290 -291 @Override -292 public Status get(PipelineAckProto ack) { -293 try { -294 return (Status) getStatusMethod.invoke(ack, 0); -295 } catch (IllegalAccessException | InvocationTargetException e) { -296 throw new RuntimeException(e); -297 } -298 } -299 }; -300 } -301 -302 private static PipelineAckStatusGetter createPipelineAckStatusGetter() -303 throws NoSuchMethodException { -304 try { -305 return createPipelineAckStatusGetter27(); -306 } catch (NoSuchMethodException e) { -307 LOG.debug("Can not get expected methods, should be hadoop 2.6-", e); -308 } -309 return createPipelineAckStatusGetter26(); -310 } -311 -312 private static StorageTypeSetter createStorageTypeSetter() throws NoSuchMethodException { -313 Method setStorageTypeMethod = -314 OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class); -315 ImmutableMap.Builder<String, StorageTypeProto> builder = ImmutableMap.builder(); -316 for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) { -317 builder.put(storageTypeProto.name(), storageTypeProto); -318 } -319 ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build(); -320 return new StorageTypeSetter() { -321 -322 @Override -323 public OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType) { -324 Object protoEnum = name2ProtoEnum.get(storageType.name()); -325 try { -326 setStorageTypeMethod.invoke(builder, protoEnum); -327 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { -328 throw new RuntimeException(e); -329 } -330 return builder; -331 } -332 }; -333 } -334 -335 private static BlockAdder createBlockAdder() throws NoSuchMethodException { -336 for (Method method : ClientProtocol.class.getMethods()) { -337 if (method.getName().equals("addBlock")) { -338 Method addBlockMethod = method; -339 Class<?>[] paramTypes = addBlockMethod.getParameterTypes(); -340 if (paramTypes[paramTypes.length - 1] == String[].class) { -341 return new BlockAdder() { -342 -343 @Override -344 public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName, -345 ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, -346 String[] favoredNodes) throws IOException { -347 try { -348 return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous, -349 excludeNodes, fileId, favoredNodes); -350 } catch (IllegalAccessException e) { -351 throw new RuntimeException(e); -352 } catch (InvocationTargetException e) { -353 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); -354 throw new RuntimeException(e); -355 } -356 } -357 }; -358 } else { -359 return new BlockAdder() { -360 -361 @Override -362 public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName, -363 ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, -364 String[] favoredNodes) throws IOException { -365 try { -366 return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous, -367 excludeNodes, fileId, favoredNodes, null); -368 } catch (IllegalAccessException e) { -369 throw new RuntimeException(e); -370 } catch (InvocationTargetException e) { -371 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); -372 throw new RuntimeException(e); -373 } -374 } -375 }; -376 } -377 } -378 } -379 throw new NoSuchMethodException("Can not find addBlock method in ClientProtocol"); -380 } -381 -382 private static PBHelper createPBHelper() throws NoSuchMethodException { -383 Class<?> helperClass; -384 try { -385 helperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient"); -386 } catch (ClassNotFoundException e) { -387 LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e); -388 helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class; -389 } -390 Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class); -391 Method convertTokenMethod = helperClass.getMethod("convert", Token.class); -392 return new PBHelper() { -393 -394 @Override -395 public ExtendedBlockProto convert(ExtendedBlock b) { -396 try { -397 return (ExtendedBlockProto) convertEBMethod.invoke(null, b); -398 } catch (IllegalAccessException | InvocationTargetException e) { -399 throw new RuntimeException(e); -400 } -401 } -402 -403 @Override -404 public TokenProto convert(Token<?> tok) { -405 try { -406 return (TokenProto) convertTokenMethod.invoke(null, tok); -407 } catch (IllegalAccessException | InvocationTargetException e) { -408 throw new RuntimeException(e); -409 } -410 } -411 }; -412 } -413 -414 private static ChecksumCreater createChecksumCreater28(Class<?> confClass) -415 throws NoSuchMethodException { -416 for (Method method : confClass.getMethods()) { -417 if (method.getName().equals("createChecksum")) { -418 Method createChecksumMethod = method; -419 return new ChecksumCreater() { +070import org.apache.hadoop.fs.CreateFlag; +071import org.apache.hadoop.fs.FileSystem; +072import org.apache.hadoop.fs.FileSystemLinkResolver; +073import org.apache.hadoop.fs.Path; +074import org.apache.hadoop.fs.UnresolvedLinkException; +075import org.apache.hadoop.fs.permission.FsPermission; +076import org.apache.hadoop.hbase.classification.InterfaceAudience; +077import org.apache.hadoop.hbase.client.ConnectionUtils; +078import org.apache.hadoop.hbase.util.CancelableProgressable; +079import org.apache.hadoop.hbase.util.FSUtils; +080import org.apache.hadoop.hdfs.DFSClient; +081import org.apache.hadoop.hdfs.DFSOutputStream; +082import org.apache.hadoop.hdfs.DistributedFileSystem; +083import org.apache.hadoop.hdfs.protocol.ClientProtocol; +084import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +085import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +086import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +087import org.apache.hadoop.hdfs.protocol.LocatedBlock; +088import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +089import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +090import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; +091import org.apache.hadoop.hdfs.protocol.datatransfer.Op; +092import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; +093import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; +094import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +095import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; +096import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; +097import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; +098import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; +099import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; +100import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +101import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; +102import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; +103import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +104import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; +105import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; +106import org.apache.hadoop.io.EnumSetWritable; +107import org.apache.hadoop.ipc.RemoteException; +108import org.apache.hadoop.net.NetUtils; +109import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; +110import org.apache.hadoop.security.token.Token; +111import org.apache.hadoop.util.DataChecksum; +112 +113/** +114 * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}. +115 */ +116@InterfaceAudience.Private +117public final class FanOutOneBlockAsyncDFSOutputHelper { +118 +119 private static final Log LOG = LogFactory.getLog(FanOutOneBlockAsyncDFSOutputHelper.class); +120 +121 private FanOutOneBlockAsyncDFSOutputHelper() { +122 } +123 +124 // use pooled allocator for performance. +125 private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT; +126 +127 // copied from DFSPacket since it is package private. +128 public static final long HEART_BEAT_SEQNO = -1L; +129 +130 // Timeouts for communicating with DataNode for streaming writes/reads +131 public static final int READ_TIMEOUT = 60 * 1000; +132 public static final int READ_TIMEOUT_EXTENSION = 5 * 1000; +133 public static final int WRITE_TIMEOUT = 8 * 60 * 1000; +134 +135 // helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a +136 // getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may +137 // get from proto directly, or combined by the reply field of the proto and a ECN object. See +138 // createPipelineAckStatusGetter for more details. +139 private interface PipelineAckStatusGetter { +140 Status get(PipelineAckProto ack); +141 } +142 +143 private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER; +144 +145 // StorageType enum is placed under o.a.h.hdfs in hadoop 2.6 and o.a.h.fs in hadoop 2.7. So here +146 // we need to use reflection to set it.See createStorageTypeSetter for more details. +147 private interface StorageTypeSetter { +148 OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType); +149 } +150 +151 private static final StorageTypeSetter STORAGE_TYPE_SETTER; +152 +153 // helper class for calling add block method on namenode. There is a addBlockFlags parameter for +154 // hadoop 2.8 or later. See createBlockAdder for more details. +155 private interface BlockAdder { +156 +157 LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName, +158 ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes) +159 throws IOException; +160 } +161 +162 private static final BlockAdder BLOCK_ADDER; +163 +164 private interface LeaseManager { +165 +166 void begin(DFSClient client, long inodeId); +167 +168 void end(DFSClient client, long inodeId); +169 } +170 +171 private static final LeaseManager LEASE_MANAGER; +172 +173 // This is used to terminate a recoverFileLease call when FileSystem is already closed. +174 // isClientRunning is not public so we need to use reflection. +175 private interface DFSClientAdaptor { +176 +177 boolean isClientRunning(DFSClient client); +178 } +179 +180 private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR; +181 +182 // helper class for convert protos. +183 private interface PBHelper { +184 +185 ExtendedBlockProto convert(ExtendedBlock b); +186 +187 TokenProto convert(Token<?> tok); +188 } +189 +190 private static final PBHelper PB_HELPER; +191 +192 // helper class for creating data checksum. +193 private interface ChecksumCreater { +194 DataChecksum createChecksum(Object conf); +195 } +196 +197 private static final ChecksumCreater CHECKSUM_CREATER; +198 +199 // helper class for creating files. +200 private interface FileCreator { +201 default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked, +202 String clientName, EnumSetWritable<CreateFlag> flag, +203 boolean createParent, short replication, long blockSize, +204 CryptoProtocolVersion[] supportedVersions) throws Exception { +205 try { +206 return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent, +207 replication, blockSize, supportedVersions); +208 } catch (InvocationTargetException e) { +209 if (e.getCause() instanceof Exception) { +210 throw (Exception) e.getCause(); +211 } else { +212 throw new RuntimeException(e.getCause()); +213 } +214 } +215 }; +216 +217 Object createObject(ClientProtocol instance, String src, FsPermission masked, +218 String clientName, EnumSetWritable<CreateFlag> flag, +219 boolean createParent, short replication, long blockSize, +220 CryptoProtocolVersion[] supportedVersions) throws Exception; +221 } +222 +223 private static final FileCreator FILE_CREATOR; +224 +225 private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException { +226 Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning"); +227 isClientRunningMethod.setAccessible(true); +228 return new DFSClientAdaptor() { +229 +230 @Override +231 public boolean isClientRunning(DFSClient client) { +232 try { +233 return (Boolean) isClientRunningMethod.invoke(client); +234 } catch (IllegalAccessException | InvocationTargetException e) { +235 throw new RuntimeException(e); +236 } +237 } +238 }; +239 } +240 +241 private static LeaseManager createLeaseManager() throws NoSuchMethodException { +242 Method beginFileLeaseMethod = +243 DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class); +244 beginFileLeaseMethod.setAccessible(true); +245 Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class); +246 endFileLeaseMethod.setAccessible(true); +247 return new LeaseManager() { +248 +249 @Override +250 public void begin(DFSClient client, long inodeId) { +251 try { +252 beginFileLeaseMethod.invoke(client, inodeId, null); +253 } catch (IllegalAccessException | InvocationTargetException e) { +254 throw new RuntimeException(e); +255 } +256 } +257 +258 @Override +259 public void end(DFSClient client, long inodeId) { +260 try { +261 endFileLeaseMethod.invoke(client, inodeId); +262 } catch (IllegalAccessException | InvocationTargetException e) { +263 throw new RuntimeException(e); +264 } +265 } +266 }; +267 } +268 +269 private static PipelineAckStatusGetter createPipelineAckStatusGetter27() +270 throws NoSuchMethodException { +271 Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList"); +272 @SuppressWarnings("rawtypes") +273 Class<? extends Enum> ecnClass; +274 try { +275 ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN") +276 .asSubclass(Enum.class); +277 } catch (ClassNotFoundException e) { +278 String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please " +279 + "update your WAL Provider to not make use of the 'asyncfs' provider. See " +280 + "HBASE-16110 for more information."; +281 LOG.error(msg, e); +282 throw new Error(msg, e); +283 } +284 @SuppressWarnings("unchecked") +285 Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED"); +286 Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class); +287 Method combineHeaderMethod = +288 PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class); +289 Method getStatusFromHeaderMethod = +290 PipelineAck.class.getMethod("getStatusFromHeader", int.class); +291 return new PipelineAckStatusGetter() { +292 +293 @Override +294 public Status get(PipelineAckProto ack) { +295 try { +296 @SuppressWarnings("unchecked") +297 List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack); +298 Integer headerFlag; +299 if (flagList.isEmpty()) { +300 Status reply = (Status) getReplyMethod.invoke(ack, 0); +301 headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply); +302 } else { +303 headerFlag = flagList.get(0); +304 } +305 return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag); +306 } catch (IllegalAccessException | InvocationTargetException e) { +307 throw new RuntimeException(e); +308 } +309 } +310 }; +311 } +312 +313 private static PipelineAckStatusGetter createPipelineAckStatusGetter26() +314 throws NoSuchMethodException { +315 Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class); +316 return new PipelineAckStatusGetter() { +317 +318 @Override +319 public Status get(PipelineAckProto ack) { +320 try { +321 return (Status) getStatusMethod.invoke(ack, 0); +322 } catch (IllegalAccessException | InvocationTargetException e) { +323 throw new RuntimeException(e); +324 } +325 } +326 }; +327 } +328 +329 private static PipelineAckStatusGetter createPipelineAckStatusGetter() +330 throws NoSuchMethodException { +331 try { +332 return createPipelineAckStatusGetter27(); +333 } catch (NoSuchMethodException e) { +334 LOG.debug("Can not get expected methods, should be hadoop 2.6-", e); +335 } +336 return createPipelineAckStatusGetter26(); +337 } +338 +339 private static StorageTypeSetter createStorageTypeSetter() throws NoSuchMethodException { +340 Method setStorageTypeMethod = +341 OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class); +342 ImmutableMap.Builder<String, StorageTypeProto> builder = ImmutableMap.builder(); +343 for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) { +344 builder.put(storageTypeProto.name(), storageTypeProto); +345 } +346 ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build(); +347 return new StorageTypeSetter() { +348 +349 @Override +350 public OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType) { +351 Object protoEnum = name2ProtoEnum.get(storageType.name()); +352 try { +353 setStorageTypeMethod.invoke(builder, protoEnum); +354 } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { +355 throw new RuntimeException(e); +356 } +357 return builder; +358 } +359 }; +360 } +361 +362 private static BlockAdder createBlockAdder() throws NoSuchMethodException { +363 for (Method method : ClientProtocol.class.getMethods()) { +364 if (method.getName().equals("addBlock")) { +365 Method addBlockMethod = method; +366 Class<?>[] paramTypes = addBlockMethod.getParameterTypes(); +367 if (paramTypes[paramTypes.length - 1] == String[].class) { +368 return new BlockAdder() { +369 +370 @Override +371 public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName, +372 ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, +373 String[] favoredNodes) throws IOException { +374 try { +375 return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous, +376 excludeNodes, fileId, favoredNodes); +377 } catch (IllegalAccessException e) { +378 throw new RuntimeException(e); +379 } catch (InvocationTargetException e) { +380 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); +381 throw new RuntimeException(e); +382 } +383 } +384 }; +385 } else { +386 return new BlockAdder() { +387 +388 @Override +389 public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName, +390 ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, +391 String[] favoredNodes) throws IOException { +392 try { +393 return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous, +394 excludeNodes, fileId, favoredNodes, null); +395 } catch (IllegalAccessException e) { +396 throw new RuntimeException(e); +397 } catch (InvocationTargetException e) { +398 Throwables.propagateIfPossible(e.getTargetException(), IOException.class); +399 throw new RuntimeException(e); +400 } +401 } +402 }; +403 } +404 } +405 } +406 throw new NoSuchMethodException("Can not find addBlock method in ClientProtocol"); +407 } +408 +409 private static PBHelper createPBHelper() throws NoSuchMethodException { +410 Class<?> helperClass; +411 try { +412 helperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient"); +413 } catch (ClassNotFoundException e) { +414 LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e); +415 helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class; +416 } +417 Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class); +418 Method convertTokenMethod = helperClass.getMethod("convert", Token.class); +419 return new PBHelper() { 420 -421 @Override -422 public DataChecksum createChecksum(Object conf) { -423 try { -424 return (DataChecksum) createChecksumMethod.invoke(conf, (Object) null); -425 } catch (IllegalAccessException | InvocationTargetException e) { -426 throw new RuntimeException(e); -427 } -428 } -429 }; -430 } -431 } -432 throw new NoSuchMethodException("Can not find createChecksum method in DfsClientConf"); -433 } -434 -435 private static ChecksumCreater createChecksumCreater27(Class<?> confClass) -436 throws NoSuchMethodException { -437 Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum"); -438 createChecksumMethod.setAccessible(true); -439 return new ChecksumCreater() { +421 @Override +422 public ExtendedBlockProto convert(ExtendedBlock b) { +423 try { +424 return (ExtendedBlockProto) convertEBMethod.invoke(null, b); +425 } catch (IllegalAccessException | InvocationTargetException e) { +426 throw new RuntimeException(e); +427 } +428 } +429 +430 @Override +431 public TokenProto convert(Token<?> tok) { +432 try { +433 return (TokenProto) convertTokenMethod.invoke(null, tok); +434 } catch (IllegalAccessException | InvocationTargetException e) { +435 throw new RuntimeException(e); +436 } +437 } +438 }; +439 } 440 -441 @Override -442 public DataChecksum createChecksum(Object conf) { -443 try { -444 return (DataChecksum) createChecksumMethod.invoke(conf); -445 } catch (IllegalAccessException | InvocationTargetException e) { -446 throw new RuntimeException(e); -447 } -448 } -449 }; -450 } -451 -452 private static ChecksumCreater createChecksumCreater() -453 throws NoSuchMethodException, ClassNotFoundException { -454 try { -455 return createChecksumCreater28( -456 Class.forName("org.apache.hadoop.hdfs.client.impl.DfsClientConf")); -457 } catch (ClassNotFoundException e) { -458 LOG.debug("No DfsClientConf class found, should be hadoop 2.7-", e); -459 } -460 return createChecksumCreater27(Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf")); -461 } -462 -463 // cancel the processing if DFSClient is already closed. -464 static final class CancelOnClose implements CancelableProgressable { -465 -466 private final DFSClient client; +441 private static ChecksumCreater createChecksumCreater28(Class<?> confClass) +442 throws NoSuchMethodException { +443 for (Method method : confClass.getMethods()) { +444 if (method.getName().equals("createChecksum")) { +445 Method createChecksumMethod = method; +446 return new ChecksumCreater() { +447 +448 @Override +449 public DataChecksum createChecksum(Object conf) { +450 try { +451 return (DataChecksum) createChecksumMethod.invoke(conf, (Object) null); +452 } catch (IllegalAccessException | InvocationTargetException e) { +453 throw new RuntimeException(e); +454 } +455 } +456 }; +457 } +458 } +459 throw new NoSuchMethodException("Can not find createChecksum method in DfsClientConf"); +460 } +461 +462 private static ChecksumCreater createChecksumCreater27(Class<?> confClass) +463 throws NoSuchMethodException { +464 Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum"); +465 createChecksumMethod.setAccessible(true); +466 return new ChecksumCreater() { 467 -468 public CancelOnClose(DFSClient client) { -469 this.client = client; -470 } -471 -472 @Override -473 public boolean progress() { -474 return DFS_CLIENT_ADAPTOR.isClientRunning(client); -475 } -476 } -477 -478 static { -479 try { -480 PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter(); -481 STORAGE_TYPE_SETTER = createStorageTypeSetter(); -482 BLOCK_ADDER = createBlockAdder(); -483 LEASE_MANAGER = createLeaseManager(); -484 DFS_CLIENT_ADAPTOR = createDFSClientAdaptor(); -485 PB_HELPER = createPBHelper(); -486 CHECKSUM_CREATER = createChecksumCreater(); -487 } catch (Exception e) { -488 String msg = "Couldn't properly initialize access to HDFS internals. Please " -489 + "update your WAL Provider to not make use of the 'asyncfs' provider. See " -490 + "HBASE-16110 for more information."; -491 LOG.error(msg, e); -492 throw new Error(msg, e); -493 } -494 } -495 -496 static void beginFileLease(DFSClient client, long inodeId) { -497 LEASE_MANAGER.begin(client, inodeId); -498 } -499 -500 static void endFileLease(DFSClient client, long inodeId) { -501 LEASE_MANAGER.end(client, inodeId); -502 } -503 -504 static DataChecksum createChecksum(DFSClient client) { -505 return CHECKSUM_CREATER.createChecksum(client.getConf()); -506 } -507 -508 static Status getStatus(PipelineAckProto ack) { -509 return PIPELINE_ACK_STATUS_GETTER.get(ack); -510 } -511 -512 private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo, -513 Promise<Channel> promise, int timeoutMs) { -514 channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), -515 new ProtobufVarint32FrameDecoder(), -516 new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()), -517 new SimpleChannelInboundHandler<BlockOpResponseProto>() { -518 -519 @Override -520 protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp) -521 throws Exception { -522 Status pipelineStatus = resp.getStatus(); -523 if (PipelineAck.isRestartOOBStatus(pipelineStatus)) { -524 throw new IOException("datanode " + dnInfo + " is restarting"); -525 } -526 String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink(); -527 if (resp.getStatus() != Status.SUCCESS) { -528 if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) { -529 throw new InvalidBlockTokenException("Got access token error" + ", status message " -530 + resp.getMessage() + ", " + logInfo); -531 } else { -532 throw new IOException("Got error" + ", status=" + resp.getStatus().name() -533 + ", status message " + resp.getMessage() + ", " + logInfo); -534 } -535 } -536 // success -537 ChannelPipeline p = ctx.pipeline(); -538 for (ChannelHandler handler; (handler = p.removeLast()) != null;) { -539 // do not remove all handlers because we may have wrap or unwrap handlers at the header -540 // of pipeline. -541 if (handler instanceof IdleStateHandler