hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject svn commit: r1567994 [2/3] - in /hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/java/org/apache/hadoop/hdfs/protocolPB/ src...
Date Thu, 13 Feb 2014 18:21:48 GMT
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1567994&r1=1567993&r2=1567994&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Thu Feb 13 18:21:46 2014
@@ -46,9 +46,6 @@ import org.apache.hadoop.fs.HasEnhancedB
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.client.ClientMmap;
-import org.apache.hadoop.hdfs.net.DomainPeer;
-import org.apache.hadoop.hdfs.net.Peer;
-import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -82,7 +79,6 @@ implements ByteBufferReadable, CanSetDro
     HasEnhancedByteBufferAccess {
   @VisibleForTesting
   static boolean tcpReadsDisabledForTesting = false;
-  private final PeerCache peerCache;
   private final DFSClient dfsClient;
   private boolean closed = false;
   private final String src;
@@ -190,8 +186,6 @@ implements ByteBufferReadable, CanSetDro
     private long totalZeroCopyBytesRead;
   }
   
-  private final FileInputStreamCache fileInputStreamCache;
-
   /**
    * This variable tracks the number of failures since the start of the
    * most recent user-facing operation. That is to say, it should be reset
@@ -223,10 +217,6 @@ implements ByteBufferReadable, CanSetDro
     this.verifyChecksum = verifyChecksum;
     this.buffersize = buffersize;
     this.src = src;
-    this.peerCache = dfsClient.peerCache;
-    this.fileInputStreamCache = new FileInputStreamCache(
-        dfsClient.getConf().shortCircuitStreamsCacheSize,
-        dfsClient.getConf().shortCircuitStreamsCacheExpiryMs);
     this.cachingStrategy =
         dfsClient.getDefaultReadCachingStrategy();
     openInfo();
@@ -572,18 +562,28 @@ implements ByteBufferReadable, CanSetDro
       try {
         ExtendedBlock blk = targetBlock.getBlock();
         Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
-        blockReader = getBlockReader(targetAddr, chosenNode, src, blk,
-            accessToken, offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
-            buffersize, verifyChecksum, dfsClient.clientName, cachingStrategy);
+        blockReader = new BlockReaderFactory(dfsClient.getConf()).
+            setInetSocketAddress(targetAddr).
+            setRemotePeerFactory(dfsClient).
+            setDatanodeInfo(chosenNode).
+            setFileName(src).
+            setBlock(blk).
+            setBlockToken(accessToken).
+            setStartOffset(offsetIntoBlock).
+            setVerifyChecksum(verifyChecksum).
+            setClientName(dfsClient.clientName).
+            setLength(blk.getNumBytes() - offsetIntoBlock).
+            setCachingStrategy(cachingStrategy).
+            setAllowShortCircuitLocalReads(!shortCircuitForbidden()).
+            setClientCacheContext(dfsClient.getClientContext()).
+            setUserGroupInformation(dfsClient.ugi).
+            setConfiguration(dfsClient.getConfiguration()).
+            build();
         if(connectFailedOnce) {
           DFSClient.LOG.info("Successfully connected to " + targetAddr +
                              " for " + blk);
         }
         return chosenNode;
-      } catch (AccessControlException ex) {
-        DFSClient.LOG.warn("Short circuit access failed " + ex);
-        dfsClient.disableLegacyBlockReaderLocal();
-        continue;
       } catch (IOException ex) {
         if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
           DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
@@ -635,7 +635,6 @@ implements ByteBufferReadable, CanSetDro
       blockReader = null;
     }
     super.close();
-    fileInputStreamCache.close();
     closed = true;
   }
 
@@ -932,9 +931,11 @@ implements ByteBufferReadable, CanSetDro
       // or fetchBlockAt(). Always get the latest list of locations at the 
       // start of the loop.
       CachingStrategy curCachingStrategy;
+      boolean allowShortCircuitLocalReads;
       synchronized (this) {
         block = getBlockAt(block.getStartOffset(), false);
         curCachingStrategy = cachingStrategy;
+        allowShortCircuitLocalReads = !shortCircuitForbidden();
       }
       DNAddrPair retval = chooseDataNode(block);
       DatanodeInfo chosenNode = retval.info;
@@ -943,11 +944,24 @@ implements ByteBufferReadable, CanSetDro
           
       try {
         Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
-            
         int len = (int) (end - start + 1);
-        reader = getBlockReader(targetAddr, chosenNode, src, block.getBlock(),
-            blockToken, start, len, buffersize, verifyChecksum,
-            dfsClient.clientName, curCachingStrategy);
+        reader = new BlockReaderFactory(dfsClient.getConf()).
+            setInetSocketAddress(targetAddr).
+            setRemotePeerFactory(dfsClient).
+            setDatanodeInfo(chosenNode).
+            setFileName(src).
+            setBlock(block.getBlock()).
+            setBlockToken(blockToken).
+            setStartOffset(start).
+            setVerifyChecksum(verifyChecksum).
+            setClientName(dfsClient.clientName).
+            setLength(len).
+            setCachingStrategy(curCachingStrategy).
+            setAllowShortCircuitLocalReads(allowShortCircuitLocalReads).
+            setClientCacheContext(dfsClient.getClientContext()).
+            setUserGroupInformation(dfsClient.ugi).
+            setConfiguration(dfsClient.getConfiguration()).
+            build();
         int nread = reader.readAll(buf, offset, len);
         if (nread != len) {
           throw new IOException("truncated return from reader.read(): " +
@@ -960,10 +974,6 @@ implements ByteBufferReadable, CanSetDro
                  e.getPos() + " from " + chosenNode);
         // we want to remember what we have tried
         addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
-      } catch (AccessControlException ex) {
-        DFSClient.LOG.warn("Short circuit access failed " + ex);
-        dfsClient.disableLegacyBlockReaderLocal();
-        continue;
       } catch (IOException e) {
         if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
           DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
@@ -1022,194 +1032,6 @@ implements ByteBufferReadable, CanSetDro
     return false;
   }
 
-  private Peer newTcpPeer(InetSocketAddress addr) throws IOException {
-    Peer peer = null;
-    boolean success = false;
-    Socket sock = null;
-    try {
-      sock = dfsClient.socketFactory.createSocket();
-      NetUtils.connect(sock, addr,
-        dfsClient.getRandomLocalInterfaceAddr(),
-        dfsClient.getConf().socketTimeout);
-      peer = TcpPeerServer.peerFromSocketAndKey(sock, 
-          dfsClient.getDataEncryptionKey());
-      success = true;
-      return peer;
-    } finally {
-      if (!success) {
-        IOUtils.closeQuietly(peer);
-        IOUtils.closeQuietly(sock);
-      }
-    }
-  }
-
-  /**
-   * Retrieve a BlockReader suitable for reading.
-   * This method will reuse the cached connection to the DN if appropriate.
-   * Otherwise, it will create a new connection.
-   * Throwing an IOException from this method is basically equivalent to 
-   * declaring the DataNode bad, so we try to connect a lot of different ways
-   * before doing that.
-   *
-   * @param dnAddr  Address of the datanode
-   * @param chosenNode Chosen datanode information
-   * @param file  File location
-   * @param block  The Block object
-   * @param blockToken  The access token for security
-   * @param startOffset  The read offset, relative to block head
-   * @param len  The number of bytes to read
-   * @param bufferSize  The IO buffer size (not the client buffer size)
-   * @param verifyChecksum  Whether to verify checksum
-   * @param clientName  Client name
-   * @param CachingStrategy  caching strategy to use
-   * @return New BlockReader instance
-   */
-  protected BlockReader getBlockReader(InetSocketAddress dnAddr,
-                                       DatanodeInfo chosenNode,
-                                       String file,
-                                       ExtendedBlock block,
-                                       Token<BlockTokenIdentifier> blockToken,
-                                       long startOffset,
-                                       long len,
-                                       int bufferSize,
-                                       boolean verifyChecksum,
-                                       String clientName,
-                                       CachingStrategy curCachingStrategy)
-      throws IOException {
-    // Firstly, we check to see if we have cached any file descriptors for
-    // local blocks.  If so, we can just re-use those file descriptors.
-    FileInputStream fis[] = fileInputStreamCache.get(chosenNode, block);
-    if (fis != null) {
-      if (DFSClient.LOG.isDebugEnabled()) {
-        DFSClient.LOG.debug("got FileInputStreams for " + block + " from " +
-            "the FileInputStreamCache.");
-      }
-      return new BlockReaderLocal.Builder(dfsClient.getConf()).
-          setFilename(file).
-          setBlock(block).
-          setStartOffset(startOffset).
-          setStreams(fis).
-          setDatanodeID(chosenNode).
-          setVerifyChecksum(verifyChecksum).
-          setBlockMetadataHeader(BlockMetadataHeader.
-              preadHeader(fis[1].getChannel())).
-          setFileInputStreamCache(fileInputStreamCache).
-          setCachingStrategy(curCachingStrategy).
-          build();
-    }
-    
-    // If the legacy local block reader is enabled and we are reading a local
-    // block, try to create a BlockReaderLocalLegacy.  The legacy local block
-    // reader implements local reads in the style first introduced by HDFS-2246.
-    if ((dfsClient.useLegacyBlockReaderLocal()) &&
-        DFSClient.isLocalAddress(dnAddr) &&
-        (!shortCircuitForbidden())) {
-      try {
-        return BlockReaderFactory.getLegacyBlockReaderLocal(dfsClient,
-            clientName, block, blockToken, chosenNode, startOffset);
-      } catch (IOException e) {
-        DFSClient.LOG.warn("error creating legacy BlockReaderLocal.  " +
-            "Disabling legacy local reads.", e);
-        dfsClient.disableLegacyBlockReaderLocal();
-      }
-    }
-
-    // Look for cached domain peers.
-    int cacheTries = 0;
-    DomainSocketFactory dsFactory = dfsClient.getDomainSocketFactory();
-    BlockReader reader = null;
-    final int nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
-    for (; cacheTries < nCachedConnRetry; ++cacheTries) {
-      Peer peer = peerCache.get(chosenNode, true);
-      if (peer == null) break;
-      try {
-        boolean allowShortCircuitLocalReads = dfsClient.getConf().
-            shortCircuitLocalReads && (!shortCircuitForbidden());
-        reader = BlockReaderFactory.newBlockReader(
-            dfsClient.getConf(), file, block, blockToken, startOffset,
-            len, verifyChecksum, clientName, peer, chosenNode, 
-            dsFactory, peerCache, fileInputStreamCache,
-            allowShortCircuitLocalReads, curCachingStrategy);
-        return reader;
-      } catch (IOException ex) {
-        DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " +
-            "Closing stale " + peer, ex);
-      } finally {
-        if (reader == null) {
-          IOUtils.closeQuietly(peer);
-        }
-      }
-    }
-
-    // Try to create a DomainPeer.
-    DomainSocket domSock = dsFactory.create(dnAddr, this);
-    if (domSock != null) {
-      Peer peer = new DomainPeer(domSock);
-      try {
-        boolean allowShortCircuitLocalReads = dfsClient.getConf().
-            shortCircuitLocalReads && (!shortCircuitForbidden());
-        reader = BlockReaderFactory.newBlockReader(
-            dfsClient.getConf(), file, block, blockToken, startOffset,
-            len, verifyChecksum, clientName, peer, chosenNode,
-            dsFactory, peerCache, fileInputStreamCache,
-            allowShortCircuitLocalReads, curCachingStrategy);
-        return reader;
-      } catch (IOException e) {
-        DFSClient.LOG.warn("failed to connect to " + domSock, e);
-      } finally {
-        if (reader == null) {
-         // If the Peer that we got the error from was a DomainPeer,
-         // mark the socket path as bad, so that newDataSocket will not try 
-         // to re-open this socket for a while.
-         dsFactory.disableDomainSocketPath(domSock.getPath());
-         IOUtils.closeQuietly(peer);
-        }
-      }
-    }
-
-    // Look for cached peers.
-    for (; cacheTries < nCachedConnRetry; ++cacheTries) {
-      Peer peer = peerCache.get(chosenNode, false);
-      if (peer == null) break;
-      try {
-        reader = BlockReaderFactory.newBlockReader(
-            dfsClient.getConf(), file, block, blockToken, startOffset,
-            len, verifyChecksum, clientName, peer, chosenNode, 
-            dsFactory, peerCache, fileInputStreamCache, false,
-            curCachingStrategy);
-        return reader;
-      } catch (IOException ex) {
-        DFSClient.LOG.debug("Error making BlockReader. Closing stale " +
-          peer, ex);
-      } finally {
-        if (reader == null) {
-          IOUtils.closeQuietly(peer);
-        }
-      }
-    }
-    if (tcpReadsDisabledForTesting) {
-      throw new IOException("TCP reads are disabled.");
-    }
-    // Try to create a new remote peer.
-    Peer peer = newTcpPeer(dnAddr);
-    try {
-      reader = BlockReaderFactory.newBlockReader(dfsClient.getConf(), file,
-          block, blockToken, startOffset, len, verifyChecksum, clientName,
-          peer, chosenNode, dsFactory, peerCache, fileInputStreamCache, false,
-          curCachingStrategy);
-      return reader;
-    } catch (IOException ex) {
-      DFSClient.LOG.debug(
-          "Exception while getting block reader, closing stale " + peer, ex);
-      throw ex;
-    } finally {
-      if (reader == null) {
-        IOUtils.closeQuietly(peer);
-      }
-    }
-  }
-
-
   /**
    * Read bytes starting from the specified position.
    * 
@@ -1555,8 +1377,7 @@ implements ByteBufferReadable, CanSetDro
     long blockStartInFile = currentLocatedBlock.getStartOffset();
     long blockPos = curPos - blockStartInFile;
     long limit = blockPos + length;
-    ClientMmap clientMmap =
-        blockReader.getClientMmap(opts, dfsClient.getMmapManager());
+    ClientMmap clientMmap = blockReader.getClientMmap(opts);
     if (clientMmap == null) {
       if (DFSClient.LOG.isDebugEnabled()) {
         DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
@@ -1565,17 +1386,25 @@ implements ByteBufferReadable, CanSetDro
       }
       return null;
     }
-    seek(pos + length);
-    ByteBuffer buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
-    buffer.position((int)blockPos);
-    buffer.limit((int)limit);
-    clientMmap.ref();
-    extendedReadBuffers.put(buffer, clientMmap);
-    readStatistics.addZeroCopyBytes(length);
-    if (DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("readZeroCopy read " + maxLength + " bytes from " +
-          "offset " + curPos + " via the zero-copy read path.  " +
-          "blockEnd = " + blockEnd);
+    boolean success = false;
+    ByteBuffer buffer;
+    try {
+      seek(pos + length);
+      buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
+      buffer.position((int)blockPos);
+      buffer.limit((int)limit);
+      extendedReadBuffers.put(buffer, clientMmap);
+      readStatistics.addZeroCopyBytes(length);
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("readZeroCopy read " + maxLength + " bytes from " +
+            "offset " + curPos + " via the zero-copy read path.  " +
+            "blockEnd = " + blockEnd);
+      }
+      success = true;
+    } finally {
+      if (!success) {
+        clientMmap.unref();
+      }
     }
     return buffer;
   }

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java?rev=1567994&r1=1567993&r2=1567994&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java Thu Feb 13 18:21:46 2014
@@ -27,29 +27,71 @@ import org.apache.hadoop.HadoopIllegalAr
 import org.apache.hadoop.hdfs.DFSClient.Conf;
 import org.apache.hadoop.net.unix.DomainSocket;
 
+import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 
 class DomainSocketFactory {
   private static final Log LOG = BlockReaderLocal.LOG;
-  private final Conf conf;
 
-  enum PathStatus {
-    UNUSABLE,
-    SHORT_CIRCUIT_DISABLED,
+  public enum PathState {
+    UNUSABLE(false, false),
+    SHORT_CIRCUIT_DISABLED(true, false),
+    VALID(true, true);
+
+    PathState(boolean usableForDataTransfer, boolean usableForShortCircuit) {
+      this.usableForDataTransfer = usableForDataTransfer;
+      this.usableForShortCircuit = usableForShortCircuit;
+    }
+
+    public boolean getUsableForDataTransfer() {
+      return usableForDataTransfer;
+    }
+
+    public boolean getUsableForShortCircuit() {
+      return usableForShortCircuit;
+    }
+
+    private final boolean usableForDataTransfer;
+    private final boolean usableForShortCircuit;
+  }
+
+  public static class PathInfo {
+    private final static PathInfo NOT_CONFIGURED =
+          new PathInfo("", PathState.UNUSABLE);
+
+    final private String path;
+    final private PathState state;
+
+    PathInfo(String path, PathState state) {
+      this.path = path;
+      this.state = state;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    public PathState getPathState() {
+      return state;
+    }
+    
+    @Override
+    public String toString() {
+      return new StringBuilder().append("PathInfo{path=").append(path).
+          append(", state=").append(state).append("}").toString();
+    }
   }
 
   /**
    * Information about domain socket paths.
    */
-  Cache<String, PathStatus> pathInfo =
+  Cache<String, PathState> pathMap =
       CacheBuilder.newBuilder()
       .expireAfterWrite(10, TimeUnit.MINUTES)
       .build();
 
   public DomainSocketFactory(Conf conf) {
-    this.conf = conf;
-
     final String feature;
     if (conf.shortCircuitLocalReads && (!conf.useLegacyBlockReaderLocal)) {
       feature = "The short-circuit local reads feature";
@@ -75,51 +117,46 @@ class DomainSocketFactory {
   }
 
   /**
-   * Create a DomainSocket.
-   * 
-   * @param addr        The address of the DataNode
-   * @param stream      The DFSInputStream the socket will be created for.
+   * Get information about a domain socket path.
+   *
+   * @param addr         The inet address to use.
+   * @param conf         The client configuration.
    *
-   * @return            null if the socket could not be created; the
-   *                    socket otherwise.  If there was an error while
-   *                    creating the socket, we will add the socket path
-   *                    to our list of failed domain socket paths.
+   * @return             Information about the socket path.
    */
-  DomainSocket create(InetSocketAddress addr, DFSInputStream stream) {
+  public PathInfo getPathInfo(InetSocketAddress addr, DFSClient.Conf conf) {
     // If there is no domain socket path configured, we can't use domain
     // sockets.
-    if (conf.domainSocketPath.isEmpty()) return null;
+    if (conf.domainSocketPath.isEmpty()) return PathInfo.NOT_CONFIGURED;
     // If we can't do anything with the domain socket, don't create it.
     if (!conf.domainSocketDataTraffic &&
         (!conf.shortCircuitLocalReads || conf.useLegacyBlockReaderLocal)) {
-      return null;
+      return PathInfo.NOT_CONFIGURED;
     }
-    // UNIX domain sockets can only be used to talk to local peers
-    if (!DFSClient.isLocalAddress(addr)) return null;
     // If the DomainSocket code is not loaded, we can't create
     // DomainSocket objects.
-    if (DomainSocket.getLoadingFailureReason() != null) return null;
+    if (DomainSocket.getLoadingFailureReason() != null) {
+      return PathInfo.NOT_CONFIGURED;
+    }
+    // UNIX domain sockets can only be used to talk to local peers
+    if (!DFSClient.isLocalAddress(addr)) return PathInfo.NOT_CONFIGURED;
     String escapedPath = DomainSocket.
         getEffectivePath(conf.domainSocketPath, addr.getPort());
-    PathStatus info = pathInfo.getIfPresent(escapedPath);
-    if (info == PathStatus.UNUSABLE) {
-      // We tried to connect to this domain socket before, and it was totally
-      // unusable.
-      return null;
-    }
-    if ((!conf.domainSocketDataTraffic) &&
-        ((info == PathStatus.SHORT_CIRCUIT_DISABLED) || 
-            stream.shortCircuitForbidden())) {
-      // If we don't want to pass data over domain sockets, and we don't want
-      // to pass file descriptors over them either, we have no use for domain
-      // sockets.
-      return null;
+    PathState status = pathMap.getIfPresent(escapedPath);
+    if (status == null) {
+      return new PathInfo(escapedPath, PathState.VALID);
+    } else {
+      return new PathInfo(escapedPath, status);
     }
+  }
+
+  public DomainSocket createSocket(PathInfo info, int socketTimeout) {
+    Preconditions.checkArgument(info.getPathState() != PathState.UNUSABLE);
     boolean success = false;
     DomainSocket sock = null;
     try {
-      sock = DomainSocket.connect(escapedPath);
-      sock.setAttribute(DomainSocket.RECEIVE_TIMEOUT, conf.socketTimeout);
+      sock = DomainSocket.connect(info.getPath());
+      sock.setAttribute(DomainSocket.RECEIVE_TIMEOUT, socketTimeout);
       success = true;
     } catch (IOException e) {
       LOG.warn("error creating DomainSocket", e);
@@ -129,7 +166,7 @@ class DomainSocketFactory {
         if (sock != null) {
           IOUtils.closeQuietly(sock);
         }
-        pathInfo.put(escapedPath, PathStatus.UNUSABLE);
+        pathMap.put(info.getPath(), PathState.UNUSABLE);
         sock = null;
       }
     }
@@ -137,10 +174,10 @@ class DomainSocketFactory {
   }
 
   public void disableShortCircuitForPath(String path) {
-    pathInfo.put(path, PathStatus.SHORT_CIRCUIT_DISABLED);
+    pathMap.put(path, PathState.SHORT_CIRCUIT_DISABLED);
   }
 
   public void disableDomainSocketPath(String path) {
-    pathInfo.put(path, PathStatus.UNUSABLE);
+    pathMap.put(path, PathState.UNUSABLE);
   }
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java?rev=1567994&r1=1567993&r2=1567994&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/PeerCache.java Thu Feb 13 18:21:46 2014
@@ -89,42 +89,19 @@ class PeerCache {
     LinkedListMultimap.create();
   private final int capacity;
   private final long expiryPeriod;
-  private static PeerCache instance = null;
   
-  @VisibleForTesting
-  PeerCache(int c, long e) {
+  public PeerCache(int c, long e) {
     this.capacity = c;
     this.expiryPeriod = e;
 
     if (capacity == 0 ) {
       LOG.info("SocketCache disabled.");
-    }
-    else if (expiryPeriod == 0) {
+    } else if (expiryPeriod == 0) {
       throw new IllegalStateException("Cannot initialize expiryPeriod to " +
-         expiryPeriod + "when cache is enabled.");
+         expiryPeriod + " when cache is enabled.");
     }
   }
  
-  public static synchronized PeerCache getInstance(int c, long e) {
-    // capacity is only initialized once
-    if (instance == null) {
-      instance = new PeerCache(c, e);
-    } else { //already initialized once
-      if (instance.capacity != c || instance.expiryPeriod != e) {
-        LOG.info("capacity and expiry periods already set to " +
-          instance.capacity + " and " + instance.expiryPeriod +
-          " respectively. Cannot set it to " + c + " and " + e);
-      }
-    }
-
-    return instance;
-  }
-
-  @VisibleForTesting
-  public static synchronized void setInstance(int c, long e) {
-    instance = new PeerCache(c, e);
-  }
-
   private boolean isDaemonStarted() {
     return (daemon == null)? false: true;
   }

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1567994&r1=1567993&r2=1567994&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Thu Feb 13 18:21:46 2014
@@ -30,7 +30,6 @@ import org.apache.hadoop.fs.FSInputCheck
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
-import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -492,8 +491,7 @@ public class RemoteBlockReader extends F
   }
 
   @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
-        ClientMmapManager mmapManager) {
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
     return null;
   }
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1567994&r1=1567993&r2=1567994&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Thu Feb 13 18:21:46 2014
@@ -32,7 +32,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
-import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -457,8 +456,7 @@ public class RemoteBlockReader2  impleme
   }
 
   @Override
-  public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
-        ClientMmapManager mmapManager) {
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
     return null;
   }
 }

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java?rev=1567994&r1=1567993&r2=1567994&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java Thu Feb 13 18:21:46 2014
@@ -17,24 +17,14 @@
  */
 package org.apache.hadoop.hdfs.client;
 
-import java.io.FileInputStream;
-
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.io.nativeio.NativeIO;
 
-import java.io.IOException;
-import java.lang.ref.WeakReference;
 import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel.MapMode;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * A memory-mapped region used by an HDFS client.
  * 
@@ -46,111 +36,46 @@ public class ClientMmap {
   static final Log LOG = LogFactory.getLog(ClientMmap.class);
   
   /**
-   * A reference to the manager of this mmap.
-   * 
-   * This is only a weak reference to help minimize the damange done by
-   * code which leaks references accidentally.
+   * A reference to the block replica which this mmap relates to.
    */
-  private final WeakReference<ClientMmapManager> manager;
+  private final ShortCircuitReplica replica;
   
   /**
-   * The actual mapped memory region.
+   * The java ByteBuffer object.
    */
   private final MappedByteBuffer map;
-  
-  /**
-   * A reference count tracking how many threads are using this object.
-   */
-  private final AtomicInteger refCount = new AtomicInteger(1);
 
   /**
-   * Block pertaining to this mmap
+   * Reference count of this ClientMmap object.
    */
-  private final ExtendedBlock block;
-  
-  /**
-   * The DataNode where this mmap came from.
-   */
-  private final DatanodeID datanodeID;
-
-  /**
-   * The monotonic time when this mmap was last evictable.
-   */
-  private long lastEvictableTimeNs;
-
-  public static ClientMmap load(ClientMmapManager manager, FileInputStream in, 
-      ExtendedBlock block, DatanodeID datanodeID) 
-          throws IOException {
-    MappedByteBuffer map =
-        in.getChannel().map(MapMode.READ_ONLY, 0,
-            in.getChannel().size());
-    return new ClientMmap(manager, map, block, datanodeID);
-  }
+  private final AtomicInteger refCount = new AtomicInteger(1);
 
-  private ClientMmap(ClientMmapManager manager, MappedByteBuffer map, 
-        ExtendedBlock block, DatanodeID datanodeID) 
-            throws IOException {
-    this.manager = new WeakReference<ClientMmapManager>(manager);
+  ClientMmap(ShortCircuitReplica replica, MappedByteBuffer map) {
+    this.replica = replica;
     this.map = map;
-    this.block = block;
-    this.datanodeID = datanodeID;
-    this.lastEvictableTimeNs = 0;
   }
 
   /**
-   * Decrement the reference count on this object.
-   * Should be called with the ClientMmapManager lock held.
+   * Increment the reference count.
+   *
+   * @return   The new reference count.
    */
-  public void unref() {
-    int count = refCount.decrementAndGet();
-    if (count < 0) {
-      throw new IllegalArgumentException("can't decrement the " +
-          "reference count on this ClientMmap lower than 0.");
-    } else if (count == 0) {
-      ClientMmapManager man = manager.get();
-      if (man == null) {
-        unmap();
-      } else {
-        man.makeEvictable(this);
-      }
-    }
+  void ref() {
+    refCount.addAndGet(1);
   }
 
   /**
-   * Increment the reference count on this object.
+   * Decrement the reference count.
    *
-   * @return     The new reference count.
+   * The parent replica gets unreferenced each time the reference count 
+   * of this object goes to 0.
    */
-  public int ref() {
-    return refCount.getAndIncrement();
-  }
-
-  @VisibleForTesting
-  public ExtendedBlock getBlock() {
-    return block;
-  }
-
-  DatanodeID getDatanodeID() {
-    return datanodeID;
+  public void unref() {
+    refCount.addAndGet(-1);
+    replica.unref();
   }
 
   public MappedByteBuffer getMappedByteBuffer() {
     return map;
   }
-
-  public void setLastEvictableTimeNs(long lastEvictableTimeNs) {
-    this.lastEvictableTimeNs = lastEvictableTimeNs;
-  }
-
-  public long getLastEvictableTimeNs() {
-    return this.lastEvictableTimeNs;
-  }
-
-  /**
-   * Unmap the memory region.
-   */
-  void unmap() {
-    assert(refCount.get() == 0);
-    NativeIO.POSIX.munmap(map);
-  }
-}
+}
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java?rev=1567994&r1=1567993&r2=1567994&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java Thu Feb 13 18:21:46 2014
@@ -102,9 +102,10 @@ public class DatanodeProtocolClientSideT
   private static DatanodeProtocolPB createNamenode(
       InetSocketAddress nameNodeAddr, Configuration conf,
       UserGroupInformation ugi) throws IOException {
-    return RPC.getProxy(DatanodeProtocolPB.class,
+    return RPC.getProtocolProxy(DatanodeProtocolPB.class,
         RPC.getProtocolVersion(DatanodeProtocolPB.class), nameNodeAddr, ugi,
-        conf, NetUtils.getSocketFactory(conf, DatanodeProtocolPB.class));
+        conf, NetUtils.getSocketFactory(conf, DatanodeProtocolPB.class),
+        org.apache.hadoop.ipc.Client.getPingInterval(conf), null).getProxy();
   }
 
   /** Create a {@link NameNode} proxy */

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1567994&r1=1567993&r2=1567994&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Thu Feb 13 18:21:46 2014
@@ -18,7 +18,26 @@
 
 package org.apache.hadoop.hdfs.server.common;
 
-import com.google.common.base.Charsets;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.jsp.JspWriter;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -27,10 +46,17 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
+import org.apache.hadoop.hdfs.ClientContext;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.RemotePeerFactory;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
-import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -53,22 +79,7 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.VersionInfo;
 
-import javax.servlet.ServletContext;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.jsp.JspWriter;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.URL;
-import java.net.URLEncoder;
-import java.util.*;
-
-import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
+import com.google.common.base.Charsets;
 
 @InterfaceAudience.Private
 public class JspHelper {
@@ -168,101 +179,97 @@ public class JspHelper {
     }
     NodeRecord[] nodes = map.values().toArray(new NodeRecord[map.size()]);
     Arrays.sort(nodes, new NodeRecordComparator());
-    return bestNode(nodes, false, conf);
+    return bestNode(nodes, false);
   }
 
   public static DatanodeInfo bestNode(LocatedBlock blk, Configuration conf)
       throws IOException {
     DatanodeInfo[] nodes = blk.getLocations();
-    return bestNode(nodes, true, conf);
+    return bestNode(nodes, true);
   }
 
-  public static DatanodeInfo bestNode(DatanodeInfo[] nodes, boolean doRandom,
-      Configuration conf) throws IOException {
-    TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
-    DatanodeInfo chosenNode = null;
-    int failures = 0;
-    Socket s = null;
-    int index = -1;
+  private static DatanodeInfo bestNode(DatanodeInfo[] nodes, boolean doRandom)
+      throws IOException {
     if (nodes == null || nodes.length == 0) {
       throw new IOException("No nodes contain this block");
     }
-    while (s == null) {
-      if (chosenNode == null) {
-        do {
-          if (doRandom) {
-            index = DFSUtil.getRandom().nextInt(nodes.length);
-          } else {
-            index++;
-          }
-          chosenNode = nodes[index];
-        } while (deadNodes.contains(chosenNode));
-      }
-      chosenNode = nodes[index];
+    int l = 0;
+    while (l < nodes.length && !nodes[l].isDecommissioned()) {
+      ++l;
+    }
 
-      //just ping to check whether the node is alive
-      InetSocketAddress targetAddr = NetUtils.createSocketAddr(
-          chosenNode.getInfoAddr());
-        
-      try {
-        s = NetUtils.getDefaultSocketFactory(conf).createSocket();
-        s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
-        s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
-      } catch (IOException e) {
-        deadNodes.add(chosenNode);
-        IOUtils.closeSocket(s);
-        s = null;
-        failures++;
-      }
-      if (failures == nodes.length)
-        throw new IOException("Could not reach the block containing the data. Please try again");
-        
+    if (l == 0) {
+      throw new IOException("No active nodes contain this block");
     }
-    s.close();
-    return chosenNode;
+
+    int index = doRandom ? DFSUtil.getRandom().nextInt(l) : 0;
+    return nodes[index];
   }
 
   public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
       long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
       long blockSize, long offsetIntoBlock, long chunkSizeToView,
-      JspWriter out, Configuration conf, DFSClient.Conf dfsConf,
-      DataEncryptionKey encryptionKey)
+      JspWriter out, final Configuration conf, DFSClient.Conf dfsConf,
+      final DataEncryptionKey encryptionKey)
           throws IOException {
     if (chunkSizeToView == 0) return;
-    Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
-    s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
-    s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
-      
     int amtToRead = (int)Math.min(chunkSizeToView, blockSize - offsetIntoBlock);
       
-      // Use the block name for file name. 
-    String file = BlockReaderFactory.getFileName(addr, poolId, blockId);
-    BlockReader blockReader = BlockReaderFactory.newBlockReader(dfsConf, file,
-        new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
-        offsetIntoBlock, amtToRead,  true,
-        "JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
-        new DatanodeID(addr.getAddress().getHostAddress(),
-            addr.getHostName(), poolId, addr.getPort(), 0, 0, 0), null,
-            null, null, false, CachingStrategy.newDefaultStrategy());
-        
+    BlockReader blockReader = new BlockReaderFactory(dfsConf).
+      setInetSocketAddress(addr).
+      setBlock(new ExtendedBlock(poolId, blockId, 0, genStamp)).
+      setFileName(BlockReaderFactory.getFileName(addr, poolId, blockId)).
+      setBlockToken(blockToken).
+      setStartOffset(offsetIntoBlock).
+      setLength(amtToRead).
+      setVerifyChecksum(true).
+      setClientName("JspHelper").
+      setClientCacheContext(ClientContext.getFromConf(conf)).
+      setDatanodeInfo(new DatanodeInfo(
+          new DatanodeID(addr.getAddress().getHostAddress(),
+              addr.getHostName(), poolId, addr.getPort(), 0, 0, 0))).
+      setCachingStrategy(CachingStrategy.newDefaultStrategy()).
+      setConfiguration(conf).
+      setRemotePeerFactory(new RemotePeerFactory() {
+        @Override
+        public Peer newConnectedPeer(InetSocketAddress addr)
+            throws IOException {
+          Peer peer = null;
+          Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
+          try {
+            sock.connect(addr, HdfsServerConstants.READ_TIMEOUT);
+            sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
+            peer = TcpPeerServer.peerFromSocketAndKey(sock, encryptionKey);
+          } finally {
+            if (peer == null) {
+              IOUtils.closeSocket(sock);
+            }
+          }
+          return peer;
+        }
+      }).
+      build();
+
     final byte[] buf = new byte[amtToRead];
-    int readOffset = 0;
-    int retries = 2;
-    while ( amtToRead > 0 ) {
-      int numRead = amtToRead;
-      try {
-        blockReader.readFully(buf, readOffset, amtToRead);
-      }
-      catch (IOException e) {
-        retries--;
-        if (retries == 0)
-          throw new IOException("Could not read data from datanode");
-        continue;
+    try {
+      int readOffset = 0;
+      int retries = 2;
+      while (amtToRead > 0) {
+        int numRead = amtToRead;
+        try {
+          blockReader.readFully(buf, readOffset, amtToRead);
+        } catch (IOException e) {
+          retries--;
+          if (retries == 0)
+            throw new IOException("Could not read data from datanode");
+          continue;
+        }
+        amtToRead -= numRead;
+        readOffset += numRead;
       }
-      amtToRead -= numRead;
-      readOffset += numRead;
+    } finally {
+      blockReader.close();
     }
-    blockReader.close();
     out.print(HtmlQuoting.quoteHtmlChars(new String(buf, Charsets.UTF_8)));
   }
 

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java?rev=1567994&r1=1567993&r2=1567994&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java Thu Feb 13 18:21:46 2014
@@ -34,6 +34,8 @@ import org.apache.hadoop.util.DataChecks
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import com.google.common.annotations.VisibleForTesting;
+
 
 
 /**
@@ -55,7 +57,8 @@ public class BlockMetadataHeader {
   private short version;
   private DataChecksum checksum = null;
     
-  BlockMetadataHeader(short version, DataChecksum checksum) {
+  @VisibleForTesting
+  public BlockMetadataHeader(short version, DataChecksum checksum) {
     this.checksum = checksum;
     this.version = version;
   }
@@ -148,7 +151,8 @@ public class BlockMetadataHeader {
    * @return 
    * @throws IOException
    */
-  private static void writeHeader(DataOutputStream out, 
+  @VisibleForTesting
+  public static void writeHeader(DataOutputStream out, 
                                   BlockMetadataHeader header) 
                                   throws IOException {
     out.writeShort(header.getVersion());

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1567994&r1=1567993&r2=1567994&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Thu Feb 13 18:21:46 2014
@@ -37,12 +37,12 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -57,43 +57,6 @@ import org.apache.hadoop.io.nativeio.Nat
 @InterfaceStability.Unstable
 public class FsDatasetCache {
   /**
-   * Keys which identify MappableBlocks.
-   */
-  private static final class Key {
-    /**
-     * Block id.
-     */
-    final long id;
-
-    /**
-     * Block pool id.
-     */
-    final String bpid;
-
-    Key(long id, String bpid) {
-      this.id = id;
-      this.bpid = bpid;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o == null) {
-        return false;
-      }
-      if (!(o.getClass() == getClass())) {
-        return false;
-      }
-      Key other = (Key)o;
-      return ((other.id == this.id) && (other.bpid.equals(this.bpid)));
-    }
-
-    @Override
-    public int hashCode() {
-      return new HashCodeBuilder().append(id).append(bpid).hashCode();
-    }
-  };
-
-  /**
    * MappableBlocks that we know about.
    */
   private static final class Value {
@@ -143,7 +106,8 @@ public class FsDatasetCache {
   /**
    * Stores MappableBlock objects and the states they're in.
    */
-  private final HashMap<Key, Value> mappableBlockMap = new HashMap<Key, Value>();
+  private final HashMap<ExtendedBlockId, Value> mappableBlockMap =
+      new HashMap<ExtendedBlockId, Value>();
 
   private final AtomicLong numBlocksCached = new AtomicLong(0);
 
@@ -260,12 +224,12 @@ public class FsDatasetCache {
    */
   synchronized List<Long> getCachedBlocks(String bpid) {
     List<Long> blocks = new ArrayList<Long>();
-    for (Iterator<Entry<Key, Value>> iter =
+    for (Iterator<Entry<ExtendedBlockId, Value>> iter =
         mappableBlockMap.entrySet().iterator(); iter.hasNext(); ) {
-      Entry<Key, Value> entry = iter.next();
-      if (entry.getKey().bpid.equals(bpid)) {
+      Entry<ExtendedBlockId, Value> entry = iter.next();
+      if (entry.getKey().getBlockPoolId().equals(bpid)) {
         if (entry.getValue().state.shouldAdvertise()) {
-          blocks.add(entry.getKey().id);
+          blocks.add(entry.getKey().getBlockId());
         }
       }
     }
@@ -278,7 +242,7 @@ public class FsDatasetCache {
   synchronized void cacheBlock(long blockId, String bpid,
       String blockFileName, long length, long genstamp,
       Executor volumeExecutor) {
-    Key key = new Key(blockId, bpid);
+    ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
     Value prevValue = mappableBlockMap.get(key);
     if (prevValue != null) {
       if (LOG.isDebugEnabled()) {
@@ -299,7 +263,7 @@ public class FsDatasetCache {
   }
 
   synchronized void uncacheBlock(String bpid, long blockId) {
-    Key key = new Key(blockId, bpid);
+    ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
     Value prevValue = mappableBlockMap.get(key);
 
     if (prevValue == null) {
@@ -344,12 +308,12 @@ public class FsDatasetCache {
    * Background worker that mmaps, mlocks, and checksums a block
    */
   private class CachingTask implements Runnable {
-    private final Key key; 
+    private final ExtendedBlockId key; 
     private final String blockFileName;
     private final long length;
     private final long genstamp;
 
-    CachingTask(Key key, String blockFileName, long length, long genstamp) {
+    CachingTask(ExtendedBlockId key, String blockFileName, long length, long genstamp) {
       this.key = key;
       this.blockFileName = blockFileName;
       this.length = length;
@@ -361,13 +325,13 @@ public class FsDatasetCache {
       boolean success = false;
       FileInputStream blockIn = null, metaIn = null;
       MappableBlock mappableBlock = null;
-      ExtendedBlock extBlk =
-          new ExtendedBlock(key.bpid, key.id, length, genstamp);
+      ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
+          key.getBlockId(), length, genstamp);
       long newUsedBytes = usedBytesCount.reserve(length);
       if (newUsedBytes < 0) {
-        LOG.warn("Failed to cache block id " + key.id + ", pool " + key.bpid +
-            ": could not reserve " + length + " more bytes in the " +
-            "cache: " + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
+        LOG.warn("Failed to cache " + key + ": could not reserve " + length +
+            " more bytes in the cache: " +
+            DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
             " of " + maxBytes + " exceeded.");
         numBlocksFailedToCache.incrementAndGet();
         return;
@@ -378,16 +342,15 @@ public class FsDatasetCache {
           metaIn = (FileInputStream)dataset.getMetaDataInputStream(extBlk)
               .getWrappedStream();
         } catch (ClassCastException e) {
-          LOG.warn("Failed to cache block with id " + key.id + ", pool " +
-                key.bpid + ": Underlying blocks are not backed by files.", e);
+          LOG.warn("Failed to cache " + key +
+              ": Underlying blocks are not backed by files.", e);
           return;
         } catch (FileNotFoundException e) {
-          LOG.info("Failed to cache block with id " + key.id + ", pool " +
-                key.bpid + ": failed to find backing files.");
+          LOG.info("Failed to cache " + key + ": failed to find backing " +
+              "files.");
           return;
         } catch (IOException e) {
-          LOG.warn("Failed to cache block with id " + key.id + ", pool " +
-                key.bpid + ": failed to open file", e);
+          LOG.warn("Failed to cache " + key + ": failed to open file", e);
           return;
         }
         try {
@@ -395,11 +358,10 @@ public class FsDatasetCache {
               load(length, blockIn, metaIn, blockFileName);
         } catch (ChecksumException e) {
           // Exception message is bogus since this wasn't caused by a file read
-          LOG.warn("Failed to cache block " + key.id + " in " + key.bpid + ": " +
-                   "checksum verification failed.");
+          LOG.warn("Failed to cache " + key + ": checksum verification failed.");
           return;
         } catch (IOException e) {
-          LOG.warn("Failed to cache block " + key.id + " in " + key.bpid, e);
+          LOG.warn("Failed to cache " + key, e);
           return;
         }
         synchronized (FsDatasetCache.this) {
@@ -409,15 +371,14 @@ public class FsDatasetCache {
                                    value.state == State.CACHING_CANCELLED);
           if (value.state == State.CACHING_CANCELLED) {
             mappableBlockMap.remove(key);
-            LOG.warn("Caching of block " + key.id + " in " + key.bpid +
-                " was cancelled.");
+            LOG.warn("Caching of " + key + " was cancelled.");
             return;
           }
           mappableBlockMap.put(key, new Value(mappableBlock, State.CACHED));
         }
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Successfully cached block " + key.id + " in " + key.bpid +
-              ".  We are now caching " + newUsedBytes + " bytes in total.");
+          LOG.debug("Successfully cached " + key + ".  We are now caching " +
+              newUsedBytes + " bytes in total.");
         }
         numBlocksCached.addAndGet(1);
         success = true;
@@ -425,9 +386,8 @@ public class FsDatasetCache {
         if (!success) {
           newUsedBytes = usedBytesCount.release(length);
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Caching of block " + key.id + " in " +
-              key.bpid + " was aborted.  We are now caching only " +
-              newUsedBytes + " + bytes in total.");
+            LOG.debug("Caching of " + key + " was aborted.  We are now " +
+                "caching only " + newUsedBytes + " + bytes in total.");
           }
           IOUtils.closeQuietly(blockIn);
           IOUtils.closeQuietly(metaIn);
@@ -445,9 +405,9 @@ public class FsDatasetCache {
   }
 
   private class UncachingTask implements Runnable {
-    private final Key key; 
+    private final ExtendedBlockId key; 
 
-    UncachingTask(Key key) {
+    UncachingTask(ExtendedBlockId key) {
       this.key = key;
     }
 
@@ -470,8 +430,8 @@ public class FsDatasetCache {
           usedBytesCount.release(value.mappableBlock.getLength());
       numBlocksCached.addAndGet(-1);
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Uncaching of block " + key.id + " in " + key.bpid +
-            " completed.  usedBytes = " + newUsedBytes);
+        LOG.debug("Uncaching of " + key + " completed.  " +
+            "usedBytes = " + newUsedBytes);
       }
     }
   }

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java?rev=1567994&r1=1567993&r2=1567994&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java Thu Feb 13 18:21:46 2014
@@ -587,6 +587,8 @@ class ClusterJspHelper {
         toXmlItemBlockWithLink(doc, nn.host, nn.httpAddress, "NameNode");
         toXmlItemBlock(doc, "Blockpool Used",
             StringUtils.byteDesc(nn.bpUsed));
+        toXmlItemBlock(doc, "Blockpool Used%",
+            DFSUtil.percent2String(DFSUtil.getPercentUsed(nn.bpUsed, total)));
         toXmlItemBlock(doc, "Files And Directories",
             Long.toString(nn.filesAndDirectories));
         toXmlItemBlock(doc, "Blocks", Long.toString(nn.blocksCount));

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java?rev=1567994&r1=1567993&r2=1567994&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java Thu Feb 13 18:21:46 2014
@@ -49,9 +49,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeDirectorySection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.AclFeatureProto;
-import org.apache.hadoop.hdfs.server.namenode.INodeReference.DstReference;
-import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
-import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
@@ -107,25 +104,6 @@ public final class FSImageFormatPBINode 
       return b.build();
     }
 
-    public static INodeReference loadINodeReference(
-        INodeSection.INodeReference r, FSDirectory dir) throws IOException {
-      long referredId = r.getReferredId();
-      INode referred = dir.getInode(referredId);
-      WithCount withCount = (WithCount) referred.getParentReference();
-      if (withCount == null) {
-        withCount = new INodeReference.WithCount(null, referred);
-      }
-      final INodeReference ref;
-      if (r.hasDstSnapshotId()) { // DstReference
-        ref = new INodeReference.DstReference(null, withCount,
-            r.getDstSnapshotId());
-      } else {
-        ref = new INodeReference.WithName(null, withCount, r.getName()
-            .toByteArray(), r.getLastSnapshotId());
-      }
-      return ref;
-    }
-
     public static INodeDirectory loadINodeDirectory(INodeSection.INode n,
         LoaderContext state) {
       assert n.getType() == INodeSection.INode.Type.DIRECTORY;
@@ -169,6 +147,8 @@ public final class FSImageFormatPBINode 
     }
 
     void loadINodeDirectorySection(InputStream in) throws IOException {
+      final List<INodeReference> refList = parent.getLoaderContext()
+          .getRefList();
       while (true) {
         INodeDirectorySection.DirEntry e = INodeDirectorySection.DirEntry
             .parseDelimitedFrom(in);
@@ -181,20 +161,13 @@ public final class FSImageFormatPBINode 
           INode child = dir.getInode(id);
           addToParent(p, child);
         }
-        for (int i = 0; i < e.getNumOfRef(); i++) {
-          INodeReference ref = loadINodeReference(in);
+        for (int refId : e.getRefChildrenList()) {
+          INodeReference ref = refList.get(refId);
           addToParent(p, ref);
         }
       }
     }
 
-    private INodeReference loadINodeReference(InputStream in)
-        throws IOException {
-      INodeSection.INodeReference ref = INodeSection.INodeReference
-          .parseDelimitedFrom(in);
-      return loadINodeReference(ref, dir);
-    }
-
     void loadINodeSection(InputStream in) throws IOException {
       INodeSection s = INodeSection.parseDelimitedFrom(in);
       fsn.resetLastInodeId(s.getLastInodeId());
@@ -379,19 +352,6 @@ public final class FSImageFormatPBINode 
       return b;
     }
 
-    public static INodeSection.INodeReference.Builder buildINodeReference(
-        INodeReference ref) throws IOException {
-      INodeSection.INodeReference.Builder rb = INodeSection.INodeReference
-          .newBuilder().setReferredId(ref.getId());
-      if (ref instanceof WithName) {
-        rb.setLastSnapshotId(((WithName) ref).getLastSnapshotId()).setName(
-            ByteString.copyFrom(ref.getLocalNameBytes()));
-      } else if (ref instanceof DstReference) {
-        rb.setDstSnapshotId(((DstReference) ref).getDstSnapshotId());
-      }
-      return rb;
-    }
-
     private final FSNamesystem fsn;
     private final FileSummary.Builder summary;
     private final SaveNamespaceContext context;
@@ -407,6 +367,8 @@ public final class FSImageFormatPBINode 
     void serializeINodeDirectorySection(OutputStream out) throws IOException {
       Iterator<INodeWithAdditionalFields> iter = fsn.getFSDirectory()
           .getINodeMap().getMapIterator();
+      final ArrayList<INodeReference> refList = parent.getSaverContext()
+          .getRefList();
       int i = 0;
       while (iter.hasNext()) {
         INodeWithAdditionalFields n = iter.next();
@@ -419,21 +381,16 @@ public final class FSImageFormatPBINode 
         if (children.size() > 0) {
           INodeDirectorySection.DirEntry.Builder b = INodeDirectorySection.
               DirEntry.newBuilder().setParent(n.getId());
-          List<INodeReference> refs = new ArrayList<INodeReference>();
           for (INode inode : children) {
             if (!inode.isReference()) {
               b.addChildren(inode.getId());
             } else {
-              refs.add(inode.asReference());
+              refList.add(inode.asReference());
+              b.addRefChildren(refList.size() - 1);
             }
           }
-          b.setNumOfRef(refs.size());
           INodeDirectorySection.DirEntry e = b.build();
           e.writeDelimitedTo(out);
-          for (INodeReference ref : refs) {
-            INodeSection.INodeReference.Builder rb = buildINodeReference(ref);
-            rb.build().writeDelimitedTo(out);
-          }
         }
 
         ++i;

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java?rev=1567994&r1=1567993&r2=1567994&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java Thu Feb 13 18:21:46 2014
@@ -75,10 +75,15 @@ public final class FSImageFormatProtobuf
 
   public static final class LoaderContext {
     private String[] stringTable;
+    private final ArrayList<INodeReference> refList = Lists.newArrayList();
 
     public String[] getStringTable() {
       return stringTable;
     }
+
+    public ArrayList<INodeReference> getRefList() {
+      return refList;
+    }
   }
 
   public static final class SaverContext {
@@ -111,6 +116,7 @@ public final class FSImageFormatProtobuf
         return map.entrySet();
       }
     }
+    private final ArrayList<INodeReference> refList = Lists.newArrayList();
 
     private final DeduplicationMap<String> stringMap = DeduplicationMap
         .newMap();
@@ -118,6 +124,10 @@ public final class FSImageFormatProtobuf
     public DeduplicationMap<String> getStringMap() {
       return stringMap;
     }
+
+    public ArrayList<INodeReference> getRefList() {
+      return refList;
+    }
   }
 
   public static final class Loader implements FSImageFormat.AbstractLoader {
@@ -125,7 +135,6 @@ public final class FSImageFormatProtobuf
     private final Configuration conf;
     private final FSNamesystem fsn;
     private final LoaderContext ctx;
-
     /** The MD5 sum of the loaded file */
     private MD5Hash imgDigest;
     /** The transaction ID of the last edit represented by the loaded file */
@@ -228,6 +237,9 @@ public final class FSImageFormatProtobuf
           inodeLoader.loadINodeSection(in);
         }
           break;
+        case INODE_REFRENCE:
+          snapshotLoader.loadINodeReferenceSection(in);
+          break;
         case INODE_DIR:
           inodeLoader.loadINodeDirectorySection(in);
           break;
@@ -315,9 +327,10 @@ public final class FSImageFormatProtobuf
   }
 
   public static final class Saver {
+    public static final int CHECK_CANCEL_INTERVAL = 4096;
+
     private final SaveNamespaceContext context;
     private final SaverContext saverContext;
-
     private long currentOffset = FSImageUtil.MAGIC_HEADER.length;
     private MD5Hash savedDigest;
 
@@ -326,7 +339,6 @@ public final class FSImageFormatProtobuf
     private OutputStream sectionOutputStream;
     private CompressionCodec codec;
     private OutputStream underlyingOutputStream;
-    public static final int CHECK_CANCEL_INTERVAL = 4096;
 
     Saver(SaveNamespaceContext context) {
       this.context = context;
@@ -402,6 +414,7 @@ public final class FSImageFormatProtobuf
 
       snapshotSaver.serializeSnapshotSection(sectionOutputStream);
       snapshotSaver.serializeSnapshotDiffSection(sectionOutputStream);
+      snapshotSaver.serializeINodeReferenceSection(sectionOutputStream);
     }
 
     private void saveInternal(FileOutputStream fout,
@@ -538,6 +551,7 @@ public final class FSImageFormatProtobuf
     STRING_TABLE("STRING_TABLE"),
     EXTENDED_ACL("EXTENDED_ACL"),
     INODE("INODE"),
+    INODE_REFRENCE("INODE_REFRENCE"),
     SNAPSHOT("SNAPSHOT"),
     INODE_DIR("INODE_DIR"),
     FILES_UNDERCONSTRUCTION("FILES_UNDERCONSTRUCTION"),

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1567994&r1=1567993&r2=1567994&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Thu Feb 13 18:21:46 2014
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -40,9 +41,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
+import org.apache.hadoop.hdfs.ClientContext;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.RemotePeerFactory;
+import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@@ -569,11 +573,10 @@ public class NamenodeFsck {
     int failures = 0;
     InetSocketAddress targetAddr = null;
     TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
-    Socket s = null;
     BlockReader blockReader = null; 
     ExtendedBlock block = lblock.getBlock(); 
 
-    while (s == null) {
+    while (blockReader == null) {
       DatanodeInfo chosenNode;
       
       try {
@@ -593,34 +596,47 @@ public class NamenodeFsck {
         continue;
       }
       try {
-        s = NetUtils.getDefaultSocketFactory(conf).createSocket();
-        s.connect(targetAddr, HdfsServerConstants.READ_TIMEOUT);
-        s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
-        
-        String file = BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(),
-            block.getBlockId());
-        blockReader = BlockReaderFactory.newBlockReader(dfs.getConf(),
-            file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
-            TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
-                getDataEncryptionKey()), chosenNode, null, null, null, 
-                false, CachingStrategy.newDropBehind());
-        
+        String file = BlockReaderFactory.getFileName(targetAddr,
+            block.getBlockPoolId(), block.getBlockId());
+        blockReader = new BlockReaderFactory(dfs.getConf()).
+            setFileName(file).
+            setBlock(block).
+            setBlockToken(lblock.getBlockToken()).
+            setStartOffset(0).
+            setLength(-1).
+            setVerifyChecksum(true).
+            setClientName("fsck").
+            setDatanodeInfo(chosenNode).
+            setInetSocketAddress(targetAddr).
+            setCachingStrategy(CachingStrategy.newDropBehind()).
+            setClientCacheContext(dfs.getClientContext()).
+            setConfiguration(namenode.conf).
+            setRemotePeerFactory(new RemotePeerFactory() {
+              @Override
+              public Peer newConnectedPeer(InetSocketAddress addr)
+                  throws IOException {
+                Peer peer = null;
+                Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
+                try {
+                  s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
+                  s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
+                  peer = TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
+                        getDataEncryptionKey());
+                } finally {
+                  if (peer == null) {
+                    IOUtils.closeQuietly(s);
+                  }
+                }
+                return peer;
+              }
+            }).
+            build();
       }  catch (IOException ex) {
         // Put chosen node into dead list, continue
         LOG.info("Failed to connect to " + targetAddr + ":" + ex);
         deadNodes.add(chosenNode);
-        if (s != null) {
-          try {
-            s.close();
-          } catch (IOException iex) {
-          }
-        }
-        s = null;
       }
     }
-    if (blockReader == null) {
-      throw new Exception("Could not open data stream for " + lblock.getBlock());
-    }
     byte[] buf = new byte[1024];
     int cnt = 0;
     boolean success = true;
@@ -638,10 +654,11 @@ public class NamenodeFsck {
       LOG.error("Error reading block", e);
       success = false;
     } finally {
-      try {s.close(); } catch (Exception e1) {}
+      blockReader.close();
     }
-    if (!success)
+    if (!success) {
       throw new Exception("Could not copy block data for " + lblock.getBlock());
+    }
   }
       
   /*

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java?rev=1567994&r1=1567993&r2=1567994&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java Thu Feb 13 18:21:46 2014
@@ -18,12 +18,10 @@
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
 import static org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode.Loader.loadINodeDirectory;
-import static org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode.Loader.loadINodeReference;
 import static org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode.Loader.loadPermission;
 import static org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode.Loader.updateBlocksMap;
 import static org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode.Saver.buildINodeDirectory;
 import static org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode.Saver.buildINodeFile;
-import static org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode.Saver.buildINodeReference;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -43,8 +41,10 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSImageFormatPBINode;
 import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf;
 import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.LoaderContext;
+import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SectionName;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeReferenceSection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SnapshotDiffSection;
 import org.apache.hadoop.hdfs.server.namenode.FsImageProto.SnapshotDiffSection.CreatedListEntry;
@@ -57,6 +57,9 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
 import org.apache.hadoop.hdfs.server.namenode.INodeMap;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.DstReference;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
+import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName;
 import org.apache.hadoop.hdfs.server.namenode.INodeWithAdditionalFields;
 import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceContext;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiff;
@@ -78,7 +81,6 @@ public class FSImageFormatPBSnapshot {
     private final FSImageFormatProtobuf.Loader parent;
     private final Map<Integer, Snapshot> snapshotMap;
 
-
     public Loader(FSNamesystem fsn, FSImageFormatProtobuf.Loader parent) {
       this.fsn = fsn;
       this.fsDir = fsn.getFSDirectory();
@@ -87,6 +89,43 @@ public class FSImageFormatPBSnapshot {
     }
 
     /**
+     * The sequence of the ref node in refList must be strictly the same with
+     * the sequence in fsimage
+     */
+    public void loadINodeReferenceSection(InputStream in) throws IOException {
+      final List<INodeReference> refList = parent.getLoaderContext()
+          .getRefList();
+      while (true) {
+        INodeReferenceSection.INodeReference e = INodeReferenceSection
+            .INodeReference.parseDelimitedFrom(in);
+        if (e == null) {
+          break;
+        }
+        INodeReference ref = loadINodeReference(e);
+        refList.add(ref);
+      }
+    }
+
+    private INodeReference loadINodeReference(
+        INodeReferenceSection.INodeReference r) throws IOException {
+      long referredId = r.getReferredId();
+      INode referred = fsDir.getInode(referredId);
+      WithCount withCount = (WithCount) referred.getParentReference();
+      if (withCount == null) {
+        withCount = new INodeReference.WithCount(null, referred);
+      }
+      final INodeReference ref;
+      if (r.hasDstSnapshotId()) { // DstReference
+        ref = new INodeReference.DstReference(null, withCount,
+            r.getDstSnapshotId());
+      } else {
+        ref = new INodeReference.WithName(null, withCount, r.getName()
+            .toByteArray(), r.getLastSnapshotId());
+      }
+      return ref;
+    }
+
+    /**
      * Load the snapshots section from fsimage. Also convert snapshottable
      * directories into {@link INodeDirectorySnapshottable}.
      *
@@ -134,6 +173,8 @@ public class FSImageFormatPBSnapshot {
      * Load the snapshot diff section from fsimage.
      */
     public void loadSnapshotDiffSection(InputStream in) throws IOException {
+      final List<INodeReference> refList = parent.getLoaderContext()
+          .getRefList();
       while (true) {
         SnapshotDiffSection.DiffEntry entry = SnapshotDiffSection.DiffEntry
             .parseDelimitedFrom(in);
@@ -148,7 +189,8 @@ public class FSImageFormatPBSnapshot {
           loadFileDiffList(in, inode.asFile(), entry.getNumOfDiff());
           break;
         case DIRECTORYDIFF:
-          loadDirectoryDiffList(in, inode.asDirectory(), entry.getNumOfDiff());
+          loadDirectoryDiffList(in, inode.asDirectory(), entry.getNumOfDiff(),
+              refList);
           break;
         }
       }
@@ -209,13 +251,13 @@ public class FSImageFormatPBSnapshot {
 
     /**
      * Load the deleted list in a DirectoryDiff
-     * @param totalSize the total size of the deleted list
-     * @param deletedNodes non-reference inodes in the deleted list. These
-     *        inodes' ids are directly recorded in protobuf
      */
-    private List<INode> loadDeletedList(InputStream in, INodeDirectory dir,
-        int refNum, List<Long> deletedNodes) throws IOException {
-      List<INode> dlist = new ArrayList<INode>(refNum + deletedNodes.size());
+    private List<INode> loadDeletedList(final List<INodeReference> refList,
+        InputStream in, INodeDirectory dir, List<Long> deletedNodes,
+        List<Integer> deletedRefNodes)
+        throws IOException {
+      List<INode> dlist = new ArrayList<INode>(deletedRefNodes.size()
+          + deletedNodes.size());
       // load non-reference inodes
       for (long deletedId : deletedNodes) {
         INode deleted = fsDir.getInode(deletedId);
@@ -223,13 +265,12 @@ public class FSImageFormatPBSnapshot {
         addToDeletedList(deleted, dir);
       }
       // load reference nodes in the deleted list
-      for (int r = 0; r < refNum; r++) {
-        INodeSection.INodeReference ref = INodeSection.INodeReference
-            .parseDelimitedFrom(in);
-        INodeReference refNode = loadINodeReference(ref, fsDir);
-        dlist.add(refNode);
-        addToDeletedList(refNode, dir);
+      for (int refId : deletedRefNodes) {
+        INodeReference deletedRef = refList.get(refId);
+        dlist.add(deletedRef);
+        addToDeletedList(deletedRef, dir);
       }
+
       Collections.sort(dlist, new Comparator<INode>() {
         @Override
         public int compare(INode n1, INode n2) {
@@ -241,7 +282,7 @@ public class FSImageFormatPBSnapshot {
 
     /** Load DirectoryDiff list for a directory with snapshot feature */
     private void loadDirectoryDiffList(InputStream in, INodeDirectory dir,
-        int size) throws IOException {
+        int size, final List<INodeReference> refList) throws IOException {
       if (!dir.isWithSnapshot()) {
         dir.addSnapshotFeature(null);
       }
@@ -284,8 +325,8 @@ public class FSImageFormatPBSnapshot {
         List<INode> clist = loadCreatedList(in, dir,
             diffInPb.getCreatedListSize());
         // load deleted list
-        List<INode> dlist = loadDeletedList(in, dir,
-            diffInPb.getNumOfDeletedRef(), diffInPb.getDeletedINodeList());
+        List<INode> dlist = loadDeletedList(refList, in, dir,
+            diffInPb.getDeletedINodeList(), diffInPb.getDeletedINodeRefList());
         // create the directory diff
         DirectoryDiff diff = new DirectoryDiff(snapshotId, copy, null,
             childrenSize, clist, dlist, useRoot);
@@ -304,7 +345,8 @@ public class FSImageFormatPBSnapshot {
     private final SaveNamespaceContext context;
 
     public Saver(FSImageFormatProtobuf.Saver parent,
-        FileSummary.Builder headers, SaveNamespaceContext context, FSNamesystem fsn) {
+        FileSummary.Builder headers, SaveNamespaceContext context,
+        FSNamesystem fsn) {
       this.parent = parent;
       this.headers = headers;
       this.context = context;
@@ -350,11 +392,41 @@ public class FSImageFormatPBSnapshot {
     }
 
     /**
+     * This can only be called after serializing both INode_Dir and SnapshotDiff
+     */
+    public void serializeINodeReferenceSection(OutputStream out)
+        throws IOException {
+      final List<INodeReference> refList = parent.getSaverContext()
+          .getRefList();
+      for (INodeReference ref : refList) {
+        INodeReferenceSection.INodeReference.Builder rb = buildINodeReference(ref);
+        rb.build().writeDelimitedTo(out);
+      }
+      parent.commitSection(headers, SectionName.INODE_REFRENCE);
+    }
+
+    private INodeReferenceSection.INodeReference.Builder buildINodeReference(
+        INodeReference ref) throws IOException {
+      INodeReferenceSection.INodeReference.Builder rb =
+          INodeReferenceSection.INodeReference.newBuilder().
+            setReferredId(ref.getId());
+      if (ref instanceof WithName) {
+        rb.setLastSnapshotId(((WithName) ref).getLastSnapshotId()).setName(
+            ByteString.copyFrom(ref.getLocalNameBytes()));
+      } else if (ref instanceof DstReference) {
+        rb.setDstSnapshotId(((DstReference) ref).getDstSnapshotId());
+      }
+      return rb;
+    }
+
+    /**
      * save all the snapshot diff to fsimage
      */
     public void serializeSnapshotDiffSection(OutputStream out)
         throws IOException {
       INodeMap inodesMap = fsn.getFSDirectory().getINodeMap();
+      final List<INodeReference> refList = parent.getSaverContext()
+          .getRefList();
       int i = 0;
       Iterator<INodeWithAdditionalFields> iter = inodesMap.getMapIterator();
       while (iter.hasNext()) {
@@ -362,7 +434,7 @@ public class FSImageFormatPBSnapshot {
         if (inode.isFile()) {
           serializeFileDiffList(inode.asFile(), out);
         } else if (inode.isDirectory()) {
-          serializeDirDiffList(inode.asDirectory(), out);
+          serializeDirDiffList(inode.asDirectory(), refList, out);
         }
         ++i;
         if (i % FSImageFormatProtobuf.Saver.CHECK_CANCEL_INTERVAL == 0) {
@@ -397,22 +469,18 @@ public class FSImageFormatPBSnapshot {
       }
     }
 
-    private void saveCreatedDeletedList(List<INode> created,
-        List<INodeReference> deletedRefs, OutputStream out) throws IOException {
+    private void saveCreatedList(List<INode> created, OutputStream out)
+        throws IOException {
       // local names of the created list member
       for (INode c : created) {
         SnapshotDiffSection.CreatedListEntry.newBuilder()
             .setName(ByteString.copyFrom(c.getLocalNameBytes())).build()
             .writeDelimitedTo(out);
       }
-      // reference nodes in deleted list
-      for (INodeReference ref : deletedRefs) {
-        INodeSection.INodeReference.Builder rb = buildINodeReference(ref);
-        rb.build().writeDelimitedTo(out);
-      }
     }
 
-    private void serializeDirDiffList(INodeDirectory dir, OutputStream out)
+    private void serializeDirDiffList(INodeDirectory dir,
+        final List<INodeReference> refList, OutputStream out)
         throws IOException {
       DirectoryWithSnapshotFeature sf = dir.getDirectoryWithSnapshotFeature();
       if (sf != null) {
@@ -438,17 +506,16 @@ public class FSImageFormatPBSnapshot {
               .getList(ListType.CREATED);
           db.setCreatedListSize(created.size());
           List<INode> deleted = diff.getChildrenDiff().getList(ListType.DELETED);
-          List<INodeReference> refs = new ArrayList<INodeReference>();
           for (INode d : deleted) {
             if (d.isReference()) {
-              refs.add(d.asReference());
+              refList.add(d.asReference());
+              db.addDeletedINodeRef(refList.size() - 1);
             } else {
               db.addDeletedINode(d.getId());
             }
           }
-          db.setNumOfDeletedRef(refs.size());
           db.build().writeDelimitedTo(out);
-          saveCreatedDeletedList(created, refs, out);
+          saveCreatedList(created, out);
         }
       }
     }



Mime
View raw message