hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject svn commit: r1575457 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/qjournal/server/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/main/java/org/apach...
Date Sat, 08 Mar 2014 00:39:24 GMT
Author: wang
Date: Sat Mar  8 00:39:23 2014
New Revision: 1575457

URL: http://svn.apache.org/r1575457
Log:
HDFS-3405. Checkpointing should use HTTP POST or PUT instead of GET-GET to send merged fsimages. Contributed by Vinayakumar B.

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java   (with props)
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetImageServlet.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestTransferFsImage.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1575457&r1=1575456&r2=1575457&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sat Mar  8 00:39:23 2014
@@ -122,6 +122,9 @@ Trunk (Unreleased)
 
     HDFS-5138. Support HDFS upgrade in HA. (atm via todd)
 
+    HDFS-3405. Checkpointing should use HTTP POST or PUT instead of GET-GET
+    to send merged fsimages. (Vinayakumar B via wang)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1575457&r1=1575456&r2=1575457&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Sat Mar  8 00:39:23 2014
@@ -460,7 +460,11 @@ public class DFSConfigKeys extends Commo
 
   // Image transfer timeout
   public static final String DFS_IMAGE_TRANSFER_TIMEOUT_KEY = "dfs.image.transfer.timeout";
-  public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 10 * 60 * 1000;
+  public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 60 * 1000;
+
+  // Image transfer chunksize
+  public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize";
+  public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024;
 
   //Keys with no defaults
   public static final String  DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java?rev=1575457&r1=1575456&r2=1575457&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java Sat Mar  8 00:39:23 2014
@@ -43,7 +43,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
-import org.apache.hadoop.hdfs.server.namenode.GetImageServlet;
+import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
 import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode;
 import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -205,15 +205,16 @@ public class GetJournalEditServlet exten
           return;
         }
         editFile = elf.getFile();
-        GetImageServlet.setVerificationHeaders(response, editFile);
-        GetImageServlet.setFileNameHeaders(response, editFile);
+        ImageServlet.setVerificationHeadersForGet(response, editFile);
+        ImageServlet.setFileNameHeaders(response, editFile);
         editFileIn = new FileInputStream(editFile);
       }
       
-      DataTransferThrottler throttler = GetImageServlet.getThrottler(conf);
+      DataTransferThrottler throttler = ImageServlet.getThrottler(conf);
 
       // send edits
-      TransferFsImage.getFileServer(response, editFile, editFileIn, throttler);
+      TransferFsImage.copyFileToStream(response.getOutputStream(), editFile,
+          editFileIn, throttler);
 
     } catch (Throwable t) {
       String errMsg = "getedit failed. " + StringUtils.stringifyException(t);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1575457&r1=1575456&r2=1575457&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java Sat Mar  8 00:39:23 2014
@@ -263,8 +263,7 @@ class Checkpointer extends Daemon {
     }
 
     if(cpCmd.needToReturnImage()) {
-      TransferFsImage.uploadImageFromStorage(
-          backupNode.nnHttpAddress, getImageListenAddress(),
+      TransferFsImage.uploadImageFromStorage(backupNode.nnHttpAddress, conf,
           bnStorage, NameNodeFile.IMAGE, txid);
     }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java?rev=1575457&r1=1575456&r2=1575457&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java Sat Mar  8 00:39:23 2014
@@ -1,481 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import static org.apache.hadoop.util.Time.now;
-
-import java.security.PrivilegedExceptionAction;
-import java.util.*;
-import java.io.*;
-import java.net.InetSocketAddress;
-import java.net.URL;
-
-import javax.servlet.ServletContext;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.HAUtil;
-import org.apache.hadoop.hdfs.server.common.JspHelper;
-import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.common.StorageInfo;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
-import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
-import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
-import org.apache.hadoop.hdfs.util.DataTransferThrottler;
-import org.apache.hadoop.hdfs.util.MD5FileUtils;
-import org.apache.hadoop.http.HttpServer2;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.MD5Hash;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ServletUtil;
-import org.apache.hadoop.util.StringUtils;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.net.InetAddresses;
-
-/**
- * This class is used in Namesystem's jetty to retrieve a file.
- * Typically used by the Secondary NameNode to retrieve image and
- * edit file for periodic checkpointing.
- */
-@InterfaceAudience.Private
-public class GetImageServlet extends HttpServlet {
-  private static final long serialVersionUID = -7669068179452648952L;
-
-  private static final Log LOG = LogFactory.getLog(GetImageServlet.class);
-
-  public final static String CONTENT_DISPOSITION = "Content-Disposition";
-  public final static String HADOOP_IMAGE_EDITS_HEADER = "X-Image-Edits-Name";
-  
-  private static final String TXID_PARAM = "txid";
-  private static final String START_TXID_PARAM = "startTxId";
-  private static final String END_TXID_PARAM = "endTxId";
-  private static final String STORAGEINFO_PARAM = "storageInfo";
-  private static final String LATEST_FSIMAGE_VALUE = "latest";
-  private static final String IMAGE_FILE_TYPE = "imageFile";
-
-  private static Set<Long> currentlyDownloadingCheckpoints =
-    Collections.<Long>synchronizedSet(new HashSet<Long>());
-  
-  @Override
-  public void doGet(final HttpServletRequest request,
-                    final HttpServletResponse response
-                    ) throws ServletException, IOException {
-    try {
-      final ServletContext context = getServletContext();
-      final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
-      final GetImageParams parsedParams = new GetImageParams(request, response);
-      final Configuration conf = (Configuration) context
-          .getAttribute(JspHelper.CURRENT_CONF);
-      final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
-      
-      if (UserGroupInformation.isSecurityEnabled() && 
-          !isValidRequestor(context, request.getUserPrincipal().getName(), conf)) {
-        response.sendError(HttpServletResponse.SC_FORBIDDEN, 
-            "Only Namenode, Secondary Namenode, and administrators may access " +
-            "this servlet");
-        LOG.warn("Received non-NN/SNN/administrator request for image or edits from " 
-            + request.getUserPrincipal().getName() + " at " + request.getRemoteHost());
-        return;
-      }
-      
-      String myStorageInfoString = nnImage.getStorage().toColonSeparatedString();
-      String theirStorageInfoString = parsedParams.getStorageInfoString();
-      if (theirStorageInfoString != null &&
-          !myStorageInfoString.equals(theirStorageInfoString)) {
-        response.sendError(HttpServletResponse.SC_FORBIDDEN,
-            "This namenode has storage info " + myStorageInfoString + 
-            " but the secondary expected " + theirStorageInfoString);
-        LOG.warn("Received an invalid request file transfer request " +
-            "from a secondary with storage info " + theirStorageInfoString);
-        return;
-      }
-      
-      UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Void>() {
-        @Override
-        public Void run() throws Exception {
-          if (parsedParams.isGetImage()) {
-            long txid = parsedParams.getTxId();
-            File imageFile = null;
-            String errorMessage = "Could not find image";
-            if (parsedParams.shouldFetchLatest()) {
-              imageFile = nnImage.getStorage().getHighestFsImageName();
-            } else {
-              errorMessage += " with txid " + txid;
-              imageFile = nnImage.getStorage().getFsImage(txid,
-                  EnumSet.of(NameNodeFile.IMAGE, NameNodeFile.IMAGE_ROLLBACK));
-            }
-            if (imageFile == null) {
-              throw new IOException(errorMessage);
-            }
-            CheckpointFaultInjector.getInstance().beforeGetImageSetsHeaders();
-            long start = now();
-            serveFile(imageFile);
-
-            if (metrics != null) { // Metrics non-null only when used inside name node
-              long elapsed = now() - start;
-              metrics.addGetImage(elapsed);
-            }
-          } else if (parsedParams.isGetEdit()) {
-            long startTxId = parsedParams.getStartTxId();
-            long endTxId = parsedParams.getEndTxId();
-            
-            File editFile = nnImage.getStorage()
-                .findFinalizedEditsFile(startTxId, endTxId);
-            long start = now();
-            serveFile(editFile);
-
-            if (metrics != null) { // Metrics non-null only when used inside name node
-              long elapsed = now() - start;
-              metrics.addGetEdit(elapsed);
-            }
-          } else if (parsedParams.isPutImage()) {
-            final long txid = parsedParams.getTxId();
-            final NameNodeFile nnf = parsedParams.getNameNodeFile();
-
-            if (! currentlyDownloadingCheckpoints.add(txid)) {
-              response.sendError(HttpServletResponse.SC_CONFLICT,
-                  "Another checkpointer is already in the process of uploading a" +
-                  " checkpoint made at transaction ID " + txid);
-              return null;
-            }
-
-            try {
-              if (nnImage.getStorage().findImageFile(nnf, txid) != null) {
-                response.sendError(HttpServletResponse.SC_CONFLICT,
-                    "Another checkpointer already uploaded an checkpoint " +
-                    "for txid " + txid);
-                return null;
-              }
-              
-              // We may have lost our ticket since last checkpoint, log in again, just in case
-              if (UserGroupInformation.isSecurityEnabled()) {
-                UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
-              }
-              
-              long start = now();
-              // issue a HTTP get request to download the new fsimage 
-              MD5Hash downloadImageDigest = TransferFsImage
-                  .downloadImageToStorage(parsedParams.getInfoServer(conf),
-                      txid, nnImage.getStorage(), true);
-              nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,
-                  downloadImageDigest);
-              if (nnf == NameNodeFile.IMAGE_ROLLBACK) {
-                    NameNodeHttpServer.getNameNodeFromContext(context)
-                        .getNamesystem().setCreatedRollbackImages(true);
-              }
-
-              if (metrics != null) { // Metrics non-null only when used inside name node
-                long elapsed = now() - start;
-                metrics.addPutImage(elapsed);
-              }
-              
-              // Now that we have a new checkpoint, we might be able to
-              // remove some old ones.
-              nnImage.purgeOldStorage(nnf);
-            } finally {
-              currentlyDownloadingCheckpoints.remove(txid);
-            }
-          }
-          return null;
-        }
-
-        private void serveFile(File file) throws IOException {
-          FileInputStream fis = new FileInputStream(file);
-          try {
-            setVerificationHeaders(response, file);
-            setFileNameHeaders(response, file);
-            if (!file.exists()) {
-              // Potential race where the file was deleted while we were in the
-              // process of setting headers!
-              throw new FileNotFoundException(file.toString());
-              // It's possible the file could be deleted after this point, but
-              // we've already opened the 'fis' stream.
-              // It's also possible length could change, but this would be
-              // detected by the client side as an inaccurate length header.
-            }
-            // send file
-            TransferFsImage.getFileServer(response, file, fis,
-                getThrottler(conf));
-          } finally {
-            IOUtils.closeStream(fis);
-          }
-        }
-      });
-      
-    } catch (Throwable t) {
-      String errMsg = "GetImage failed. " + StringUtils.stringifyException(t);
-      response.sendError(HttpServletResponse.SC_GONE, errMsg);
-      throw new IOException(errMsg);
-    } finally {
-      response.getOutputStream().close();
-    }
-  }
-  
-  public static void setFileNameHeaders(HttpServletResponse response,
-      File file) {
-    response.setHeader(CONTENT_DISPOSITION, "attachment; filename=" +
-        file.getName());
-    response.setHeader(HADOOP_IMAGE_EDITS_HEADER, file.getName());
-  }
-  
-  /**
-   * Construct a throttler from conf
-   * @param conf configuration
-   * @return a data transfer throttler
-   */
-  public final static DataTransferThrottler getThrottler(Configuration conf) {
-    long transferBandwidth = 
-      conf.getLong(DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY,
-                   DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_DEFAULT);
-    DataTransferThrottler throttler = null;
-    if (transferBandwidth > 0) {
-      throttler = new DataTransferThrottler(transferBandwidth);
-    }
-    return throttler;
-  }
-  
-  @VisibleForTesting
-  static boolean isValidRequestor(ServletContext context, String remoteUser,
-      Configuration conf) throws IOException {
-    if(remoteUser == null) { // This really shouldn't happen...
-      LOG.warn("Received null remoteUser while authorizing access to getImage servlet");
-      return false;
-    }
-    
-    Set<String> validRequestors = new HashSet<String>();
-
-    validRequestors.add(
-        SecurityUtil.getServerPrincipal(conf
-            .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY), NameNode
-            .getAddress(conf).getHostName()));
-    validRequestors.add(
-        SecurityUtil.getServerPrincipal(conf
-            .get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY),
-            SecondaryNameNode.getHttpAddress(conf).getHostName()));
-
-    if (HAUtil.isHAEnabled(conf, DFSUtil.getNamenodeNameServiceId(conf))) {
-      Configuration otherNnConf = HAUtil.getConfForOtherNode(conf);
-      validRequestors.add(
-          SecurityUtil.getServerPrincipal(otherNnConf
-              .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
-              NameNode.getAddress(otherNnConf).getHostName()));
-    }
-
-    for(String v : validRequestors) {
-      if(v != null && v.equals(remoteUser)) {
-        LOG.info("GetImageServlet allowing checkpointer: " + remoteUser);
-        return true;
-      }
-    }
-    
-    if (HttpServer2.userHasAdministratorAccess(context, remoteUser)) {
-      LOG.info("GetImageServlet allowing administrator: " + remoteUser);
-      return true;
-    }
-    
-    LOG.info("GetImageServlet rejecting: " + remoteUser);
-    return false;
-  }
-  
-  /**
-   * Set headers for content length, and, if available, md5.
-   * @throws IOException 
-   */
-  public static void setVerificationHeaders(HttpServletResponse response, File file)
-  throws IOException {
-    response.setHeader(TransferFsImage.CONTENT_LENGTH,
-        String.valueOf(file.length()));
-    MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
-    if (hash != null) {
-      response.setHeader(TransferFsImage.MD5_HEADER, hash.toString());
-    }
-  }
-  
-  static String getParamStringForMostRecentImage() {
-    return "getimage=1&" + TXID_PARAM + "=" + LATEST_FSIMAGE_VALUE;
-  }
-
-  static String getParamStringForImage(NameNodeFile nnf, long txid,
-      StorageInfo remoteStorageInfo) {
-    final String imageType = nnf == null ? "" : "&" + IMAGE_FILE_TYPE + "="
-        + nnf.name();
-    return "getimage=1&" + TXID_PARAM + "=" + txid
-      + imageType
-      + "&" + STORAGEINFO_PARAM + "=" +
-      remoteStorageInfo.toColonSeparatedString();
-  }
-
-  static String getParamStringForLog(RemoteEditLog log,
-      StorageInfo remoteStorageInfo) {
-    return "getedit=1&" + START_TXID_PARAM + "=" + log.getStartTxId()
-        + "&" + END_TXID_PARAM + "=" + log.getEndTxId()
-        + "&" + STORAGEINFO_PARAM + "=" +
-          remoteStorageInfo.toColonSeparatedString();
-  }
-  
-  static String getParamStringToPutImage(NameNodeFile nnf, long txid,
-      URL url, Storage storage) {
-    InetSocketAddress imageListenAddress = NetUtils.createSocketAddr(url
-        .getAuthority());
-    String machine = !imageListenAddress.isUnresolved()
-        && imageListenAddress.getAddress().isAnyLocalAddress() ? null
-        : imageListenAddress.getHostName();
-    return "putimage=1" +
-      "&" + TXID_PARAM + "=" + txid +
-      "&" + IMAGE_FILE_TYPE + "=" + nnf.name() +
-      "&port=" + imageListenAddress.getPort() +
-      (machine != null ? "&machine=" + machine : "")
-      + "&" + STORAGEINFO_PARAM + "=" +
-      storage.toColonSeparatedString();
-  }
-
-  
-  static class GetImageParams {
-    private boolean isGetImage;
-    private boolean isGetEdit;
-    private boolean isPutImage;
-    private int remoteport;
-    private String machineName;
-    private NameNodeFile nnf;
-    private long startTxId, endTxId, txId;
-    private String storageInfoString;
-    private boolean fetchLatest;
-
-    /**
-     * @param request the object from which this servlet reads the url contents
-     * @param response the object into which this servlet writes the url contents
-     * @throws IOException if the request is bad
-     */
-    public GetImageParams(HttpServletRequest request,
-                          HttpServletResponse response
-                           ) throws IOException {
-      @SuppressWarnings("unchecked")
-      Map<String, String[]> pmap = request.getParameterMap();
-      isGetImage = isGetEdit = isPutImage = fetchLatest = false;
-      remoteport = 0;
-
-      for (Map.Entry<String, String[]> entry : pmap.entrySet()) {
-        String key = entry.getKey();
-        String[] val = entry.getValue();
-        if (key.equals("getimage")) { 
-          isGetImage = true;
-          try {
-            txId = ServletUtil.parseLongParam(request, TXID_PARAM);
-            String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
-            nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
-                .valueOf(imageType);
-          } catch (NumberFormatException nfe) {
-            if (request.getParameter(TXID_PARAM).equals(LATEST_FSIMAGE_VALUE)) {
-              fetchLatest = true;
-            } else {
-              throw nfe;
-            }
-          }
-        } else if (key.equals("getedit")) { 
-          isGetEdit = true;
-          startTxId = ServletUtil.parseLongParam(request, START_TXID_PARAM);
-          endTxId = ServletUtil.parseLongParam(request, END_TXID_PARAM);
-        } else if (key.equals("putimage")) { 
-          isPutImage = true;
-          txId = ServletUtil.parseLongParam(request, TXID_PARAM);
-          String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
-          nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
-              .valueOf(imageType);
-        } else if (key.equals("port")) { 
-          remoteport = new Integer(val[0]).intValue();
-        } else if (key.equals("machine")) {
-          machineName = val[0];
-        } else if (key.equals(STORAGEINFO_PARAM)) {
-          storageInfoString = val[0];
-        }
-      }
-
-      if (machineName == null) {
-        machineName = request.getRemoteHost();
-        if (InetAddresses.isInetAddress(machineName)) {
-          machineName = NetUtils.getHostNameOfIP(machineName);
-        }
-      }
-
-      int numGets = (isGetImage?1:0) + (isGetEdit?1:0);
-      if ((numGets > 1) || (numGets == 0) && !isPutImage) {
-        throw new IOException("Illegal parameters to TransferFsImage");
-      }
-    }
-
-    public String getStorageInfoString() {
-      return storageInfoString;
-    }
-
-    public long getTxId() {
-      Preconditions.checkState(isGetImage || isPutImage);
-      return txId;
-    }
-
-    public NameNodeFile getNameNodeFile() {
-      Preconditions.checkState(isPutImage || isGetImage);
-      return nnf;
-    }
-
-    public long getStartTxId() {
-      Preconditions.checkState(isGetEdit);
-      return startTxId;
-    }
-    
-    public long getEndTxId() {
-      Preconditions.checkState(isGetEdit);
-      return endTxId;
-    }
-
-    boolean isGetEdit() {
-      return isGetEdit;
-    }
-
-    boolean isGetImage() {
-      return isGetImage;
-    }
-
-    boolean isPutImage() {
-      return isPutImage;
-    }
-    
-    URL getInfoServer(Configuration conf) throws IOException {
-      if (machineName == null || remoteport == 0) {
-        throw new IOException("MachineName and port undefined");
-      }
-      return new URL(DFSUtil.getHttpClientScheme(conf), machineName, remoteport, "");
-    }
-    
-    boolean shouldFetchLatest() {
-      return fetchLatest;
-    }
-    
-  }
-}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java?rev=1575457&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java Sat Mar  8 00:39:23 2014
@@ -0,0 +1,544 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.apache.hadoop.util.Time.now;
+
+import java.net.HttpURLConnection;
+import java.security.PrivilegedExceptionAction;
+import java.util.*;
+import java.io.*;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
+import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
+import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.hdfs.util.MD5FileUtils;
+import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ServletUtil;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * This class is used in Namesystem's jetty to retrieve/upload a file 
+ * Typically used by the Secondary NameNode to retrieve image and
+ * edit file for periodic checkpointing in Non-HA deployments.
+ * Standby NameNode uses to upload checkpoints in HA deployments.
+ */
+@InterfaceAudience.Private
+public class ImageServlet extends HttpServlet {
+
+  public static final String PATH_SPEC = "/imagetransfer";
+
+  private static final long serialVersionUID = -7669068179452648952L;
+
+  private static final Log LOG = LogFactory.getLog(ImageServlet.class);
+
+  public final static String CONTENT_DISPOSITION = "Content-Disposition";
+  public final static String HADOOP_IMAGE_EDITS_HEADER = "X-Image-Edits-Name";
+  
+  private static final String TXID_PARAM = "txid";
+  private static final String START_TXID_PARAM = "startTxId";
+  private static final String END_TXID_PARAM = "endTxId";
+  private static final String STORAGEINFO_PARAM = "storageInfo";
+  private static final String LATEST_FSIMAGE_VALUE = "latest";
+  private static final String IMAGE_FILE_TYPE = "imageFile";
+
+  private static Set<Long> currentlyDownloadingCheckpoints =
+    Collections.<Long>synchronizedSet(new HashSet<Long>());
+  
+  @Override
+  public void doGet(final HttpServletRequest request,
+      final HttpServletResponse response) throws ServletException, IOException {
+    try {
+      final ServletContext context = getServletContext();
+      final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
+      final GetImageParams parsedParams = new GetImageParams(request, response);
+      final Configuration conf = (Configuration) context
+          .getAttribute(JspHelper.CURRENT_CONF);
+      final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
+
+      validateRequest(context, conf, request, response, nnImage,
+          parsedParams.getStorageInfoString());
+
+      UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          if (parsedParams.isGetImage()) {
+            long txid = parsedParams.getTxId();
+            File imageFile = null;
+            String errorMessage = "Could not find image";
+            if (parsedParams.shouldFetchLatest()) {
+              imageFile = nnImage.getStorage().getHighestFsImageName();
+            } else {
+              errorMessage += " with txid " + txid;
+              imageFile = nnImage.getStorage().getFsImage(txid,
+                  EnumSet.of(NameNodeFile.IMAGE, NameNodeFile.IMAGE_ROLLBACK));
+            }
+            if (imageFile == null) {
+              throw new IOException(errorMessage);
+            }
+            CheckpointFaultInjector.getInstance().beforeGetImageSetsHeaders();
+            long start = now();
+            serveFile(imageFile);
+
+            if (metrics != null) { // Metrics non-null only when used inside name node
+              long elapsed = now() - start;
+              metrics.addGetImage(elapsed);
+            }
+          } else if (parsedParams.isGetEdit()) {
+            long startTxId = parsedParams.getStartTxId();
+            long endTxId = parsedParams.getEndTxId();
+            
+            File editFile = nnImage.getStorage()
+                .findFinalizedEditsFile(startTxId, endTxId);
+            long start = now();
+            serveFile(editFile);
+
+            if (metrics != null) { // Metrics non-null only when used inside name node
+              long elapsed = now() - start;
+              metrics.addGetEdit(elapsed);
+            }
+          }
+          return null;
+        }
+
+        private void serveFile(File file) throws IOException {
+          FileInputStream fis = new FileInputStream(file);
+          try {
+            setVerificationHeadersForGet(response, file);
+            setFileNameHeaders(response, file);
+            if (!file.exists()) {
+              // Potential race where the file was deleted while we were in the
+              // process of setting headers!
+              throw new FileNotFoundException(file.toString());
+              // It's possible the file could be deleted after this point, but
+              // we've already opened the 'fis' stream.
+              // It's also possible length could change, but this would be
+              // detected by the client side as an inaccurate length header.
+            }
+            // send file
+            TransferFsImage.copyFileToStream(response.getOutputStream(),
+               file, fis, getThrottler(conf));
+          } finally {
+            IOUtils.closeStream(fis);
+          }
+        }
+      });
+      
+    } catch (Throwable t) {
+      String errMsg = "GetImage failed. " + StringUtils.stringifyException(t);
+      response.sendError(HttpServletResponse.SC_GONE, errMsg);
+      throw new IOException(errMsg);
+    } finally {
+      response.getOutputStream().close();
+    }
+  }
+
+  private void validateRequest(ServletContext context, Configuration conf,
+      HttpServletRequest request, HttpServletResponse response,
+      FSImage nnImage, String theirStorageInfoString) throws IOException {
+
+    if (UserGroupInformation.isSecurityEnabled()
+        && !isValidRequestor(context, request.getUserPrincipal().getName(),
+            conf)) {
+      String errorMsg = "Only Namenode, Secondary Namenode, and administrators may access "
+          + "this servlet";
+      response.sendError(HttpServletResponse.SC_FORBIDDEN, errorMsg);
+      LOG.warn("Received non-NN/SNN/administrator request for image or edits from "
+          + request.getUserPrincipal().getName()
+          + " at "
+          + request.getRemoteHost());
+      throw new IOException(errorMsg);
+    }
+
+    String myStorageInfoString = nnImage.getStorage().toColonSeparatedString();
+    if (theirStorageInfoString != null
+        && !myStorageInfoString.equals(theirStorageInfoString)) {
+      String errorMsg = "This namenode has storage info " + myStorageInfoString
+          + " but the secondary expected " + theirStorageInfoString;
+      response.sendError(HttpServletResponse.SC_FORBIDDEN, errorMsg);
+      LOG.warn("Received an invalid request file transfer request "
+          + "from a secondary with storage info " + theirStorageInfoString);
+      throw new IOException(errorMsg);
+    }
+  }
+
+  public static void setFileNameHeaders(HttpServletResponse response,
+      File file) {
+    response.setHeader(CONTENT_DISPOSITION, "attachment; filename=" +
+        file.getName());
+    response.setHeader(HADOOP_IMAGE_EDITS_HEADER, file.getName());
+  }
+  
+  /**
+   * Construct a throttler from conf
+   * @param conf configuration
+   * @return a data transfer throttler
+   */
+  public final static DataTransferThrottler getThrottler(Configuration conf) {
+    long transferBandwidth = 
+      conf.getLong(DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY,
+                   DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_DEFAULT);
+    DataTransferThrottler throttler = null;
+    if (transferBandwidth > 0) {
+      throttler = new DataTransferThrottler(transferBandwidth);
+    }
+    return throttler;
+  }
+  
+  @VisibleForTesting
+  static boolean isValidRequestor(ServletContext context, String remoteUser,
+      Configuration conf) throws IOException {
+    if (remoteUser == null) { // This really shouldn't happen...
+      LOG.warn("Received null remoteUser while authorizing access to getImage servlet");
+      return false;
+    }
+
+    Set<String> validRequestors = new HashSet<String>();
+
+    validRequestors.add(SecurityUtil.getServerPrincipal(conf
+        .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
+        NameNode.getAddress(conf).getHostName()));
+    validRequestors.add(SecurityUtil.getServerPrincipal(
+        conf.get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY),
+        SecondaryNameNode.getHttpAddress(conf).getHostName()));
+
+    if (HAUtil.isHAEnabled(conf, DFSUtil.getNamenodeNameServiceId(conf))) {
+      Configuration otherNnConf = HAUtil.getConfForOtherNode(conf);
+      validRequestors.add(SecurityUtil.getServerPrincipal(otherNnConf
+          .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
+          NameNode.getAddress(otherNnConf).getHostName()));
+    }
+
+    for (String v : validRequestors) {
+      if (v != null && v.equals(remoteUser)) {
+        LOG.info("ImageServlet allowing checkpointer: " + remoteUser);
+        return true;
+      }
+    }
+
+    if (HttpServer2.userHasAdministratorAccess(context, remoteUser)) {
+      LOG.info("ImageServlet allowing administrator: " + remoteUser);
+      return true;
+    }
+
+    LOG.info("ImageServlet rejecting: " + remoteUser);
+    return false;
+  }
+  
+  /**
+   * Set headers for content length, and, if available, md5.
+   * @throws IOException 
+   */
+  public static void setVerificationHeadersForGet(HttpServletResponse response,
+      File file) throws IOException {
+    response.setHeader(TransferFsImage.CONTENT_LENGTH,
+        String.valueOf(file.length()));
+    MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
+    if (hash != null) {
+      response.setHeader(TransferFsImage.MD5_HEADER, hash.toString());
+    }
+  }
+  
+  static String getParamStringForMostRecentImage() {
+    return "getimage=1&" + TXID_PARAM + "=" + LATEST_FSIMAGE_VALUE;
+  }
+
+  static String getParamStringForImage(NameNodeFile nnf, long txid,
+      StorageInfo remoteStorageInfo) {
+    final String imageType = nnf == null ? "" : "&" + IMAGE_FILE_TYPE + "="
+        + nnf.name();
+    return "getimage=1&" + TXID_PARAM + "=" + txid
+      + imageType
+      + "&" + STORAGEINFO_PARAM + "=" +
+      remoteStorageInfo.toColonSeparatedString();
+  }
+
+  static String getParamStringForLog(RemoteEditLog log,
+      StorageInfo remoteStorageInfo) {
+    return "getedit=1&" + START_TXID_PARAM + "=" + log.getStartTxId()
+        + "&" + END_TXID_PARAM + "=" + log.getEndTxId()
+        + "&" + STORAGEINFO_PARAM + "=" +
+          remoteStorageInfo.toColonSeparatedString();
+  }
+
+  static class GetImageParams {
+    private boolean isGetImage;
+    private boolean isGetEdit;
+    private NameNodeFile nnf;
+    private long startTxId, endTxId, txId;
+    private String storageInfoString;
+    private boolean fetchLatest;
+
+    /**
+     * @param request the object from which this servlet reads the url contents
+     * @param response the object into which this servlet writes the url contents
+     * @throws IOException if the request is bad
+     */
+    public GetImageParams(HttpServletRequest request,
+                          HttpServletResponse response
+                           ) throws IOException {
+      @SuppressWarnings("unchecked")
+      Map<String, String[]> pmap = request.getParameterMap();
+      isGetImage = isGetEdit = fetchLatest = false;
+
+      for (Map.Entry<String, String[]> entry : pmap.entrySet()) {
+        String key = entry.getKey();
+        String[] val = entry.getValue();
+        if (key.equals("getimage")) { 
+          isGetImage = true;
+          try {
+            txId = ServletUtil.parseLongParam(request, TXID_PARAM);
+            String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
+            nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
+                .valueOf(imageType);
+          } catch (NumberFormatException nfe) {
+            if (request.getParameter(TXID_PARAM).equals(LATEST_FSIMAGE_VALUE)) {
+              fetchLatest = true;
+            } else {
+              throw nfe;
+            }
+          }
+        } else if (key.equals("getedit")) { 
+          isGetEdit = true;
+          startTxId = ServletUtil.parseLongParam(request, START_TXID_PARAM);
+          endTxId = ServletUtil.parseLongParam(request, END_TXID_PARAM);
+        } else if (key.equals(STORAGEINFO_PARAM)) {
+          storageInfoString = val[0];
+        }
+      }
+
+      int numGets = (isGetImage?1:0) + (isGetEdit?1:0);
+      if ((numGets > 1) || (numGets == 0)) {
+        throw new IOException("Illegal parameters to TransferFsImage");
+      }
+    }
+
+    public String getStorageInfoString() {
+      return storageInfoString;
+    }
+
+    public long getTxId() {
+      Preconditions.checkState(isGetImage);
+      return txId;
+    }
+
+    public NameNodeFile getNameNodeFile() {
+      Preconditions.checkState(isGetImage);
+      return nnf;
+    }
+
+    public long getStartTxId() {
+      Preconditions.checkState(isGetEdit);
+      return startTxId;
+    }
+    
+    public long getEndTxId() {
+      Preconditions.checkState(isGetEdit);
+      return endTxId;
+    }
+
+    boolean isGetEdit() {
+      return isGetEdit;
+    }
+
+    boolean isGetImage() {
+      return isGetImage;
+    }
+
+    boolean shouldFetchLatest() {
+      return fetchLatest;
+    }
+    
+  }
+
+  /**
+   * Set headers for image length and if available, md5.
+   * 
+   * @throws IOException
+   */
+  static void setVerificationHeadersForPut(HttpURLConnection connection,
+      File file) throws IOException {
+    connection.setRequestProperty(TransferFsImage.CONTENT_LENGTH,
+        String.valueOf(file.length()));
+    MD5Hash hash = MD5FileUtils.readStoredMd5ForFile(file);
+    if (hash != null) {
+      connection
+          .setRequestProperty(TransferFsImage.MD5_HEADER, hash.toString());
+    }
+  }
+
+  /**
+   * Set the required parameters for uploading image
+   * 
+   * @param httpMethod instance of method to set the parameters
+   * @param storage colon separated storageInfo string
+   * @param txid txid of the image
+   * @param imageFileSize size of the imagefile to be uploaded
+   * @param nnf NameNodeFile Type
+   * @return Returns map of parameters to be used with PUT request.
+   */
+  static Map<String, String> getParamsForPutImage(Storage storage, long txid,
+      long imageFileSize, NameNodeFile nnf) {
+    Map<String, String> params = new HashMap<String, String>();
+    params.put(TXID_PARAM, Long.toString(txid));
+    params.put(STORAGEINFO_PARAM, storage.toColonSeparatedString());
+    // setting the length of the file to be uploaded in separate property as
+    // Content-Length only supports up to 2GB
+    params.put(TransferFsImage.FILE_LENGTH, Long.toString(imageFileSize));
+    params.put(IMAGE_FILE_TYPE, nnf.name());
+    return params;
+  }
+
+  @Override
+  protected void doPut(final HttpServletRequest request,
+      final HttpServletResponse response) throws ServletException, IOException {
+    try {
+      ServletContext context = getServletContext();
+      final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
+      final Configuration conf = (Configuration) getServletContext()
+          .getAttribute(JspHelper.CURRENT_CONF);
+      final PutImageParams parsedParams = new PutImageParams(request, response,
+          conf);
+      final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
+
+      validateRequest(context, conf, request, response, nnImage,
+          parsedParams.getStorageInfoString());
+
+      UserGroupInformation.getCurrentUser().doAs(
+          new PrivilegedExceptionAction<Void>() {
+
+            @Override
+            public Void run() throws Exception {
+
+              final long txid = parsedParams.getTxId();
+
+              final NameNodeFile nnf = parsedParams.getNameNodeFile();
+
+              if (!currentlyDownloadingCheckpoints.add(txid)) {
+                response.sendError(HttpServletResponse.SC_CONFLICT,
+                    "Another checkpointer is already in the process of uploading a"
+                        + " checkpoint made at transaction ID " + txid);
+                return null;
+              }
+              try {
+                if (nnImage.getStorage().findImageFile(nnf, txid) != null) {
+                  response.sendError(HttpServletResponse.SC_CONFLICT,
+                      "Another checkpointer already uploaded an checkpoint "
+                          + "for txid " + txid);
+                  return null;
+                }
+
+                InputStream stream = request.getInputStream();
+                try {
+                  long start = now();
+                  MD5Hash downloadImageDigest = TransferFsImage
+                      .handleUploadImageRequest(request, txid,
+                          nnImage.getStorage(), stream,
+                          parsedParams.getFileSize(), getThrottler(conf));
+                  nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,
+                      downloadImageDigest);
+                  // Metrics non-null only when used inside name node
+                  if (metrics != null) {
+                    long elapsed = now() - start;
+                    metrics.addPutImage(elapsed);
+                  }
+                  // Now that we have a new checkpoint, we might be able to
+                  // remove some old ones.
+                  nnImage.purgeOldStorage(nnf);
+                } finally {
+                  stream.close();
+                }
+              } finally {
+                currentlyDownloadingCheckpoints.remove(txid);
+              }
+              return null;
+            }
+
+          });
+    } catch (Throwable t) {
+      String errMsg = "PutImage failed. " + StringUtils.stringifyException(t);
+      response.sendError(HttpServletResponse.SC_GONE, errMsg);
+      throw new IOException(errMsg);
+    }
+  }
+
+  /*
+   * Params required to handle put image request
+   */
+  static class PutImageParams {
+    private long txId = -1;
+    private String storageInfoString = null;
+    private long fileSize = 0L;
+    private NameNodeFile nnf;
+
+    public PutImageParams(HttpServletRequest request,
+        HttpServletResponse response, Configuration conf) throws IOException {
+      txId = ServletUtil.parseLongParam(request, TXID_PARAM);
+      storageInfoString = ServletUtil.getParameter(request, STORAGEINFO_PARAM);
+      fileSize = ServletUtil.parseLongParam(request,
+          TransferFsImage.FILE_LENGTH);
+      String imageType = ServletUtil.getParameter(request, IMAGE_FILE_TYPE);
+      nnf = imageType == null ? NameNodeFile.IMAGE : NameNodeFile
+          .valueOf(imageType);
+      if (fileSize == 0 || txId == -1 || storageInfoString == null
+          || storageInfoString.isEmpty()) {
+        throw new IOException("Illegal parameters to TransferFsImage");
+      }
+    }
+
+    public long getTxId() {
+      return txId;
+    }
+
+    public String getStorageInfoString() {
+      return storageInfoString;
+    }
+
+    public long getFileSize() {
+      return fileSize;
+    }
+
+    public NameNodeFile getNameNodeFile() {
+      return nnf;
+    }
+  }
+}

Propchange: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java?rev=1575457&r1=1575456&r2=1575457&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java Sat Mar  8 00:39:23 2014
@@ -233,8 +233,8 @@ public class NameNodeHttpServer {
         CancelDelegationTokenServlet.class, true);
     httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class,
         true);
-    httpServer.addInternalServlet("getimage", "/getimage",
-        GetImageServlet.class, true);
+    httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
+        ImageServlet.class, true);
     httpServer.addInternalServlet("listPaths", "/listPaths/*",
         ListPathsServlet.class, false);
     httpServer.addInternalServlet("data", "/data/*",

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1575457&r1=1575456&r2=1575457&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Sat Mar  8 00:39:23 2014
@@ -114,7 +114,6 @@ public class SecondaryNameNode implement
   private InetSocketAddress nameNodeAddr;
   private volatile boolean shouldRun;
   private HttpServer2 infoServer;
-  private URL imageListenURL;
 
   private Collection<URI> checkpointDirs;
   private List<URI> checkpointEditsDirs;
@@ -267,13 +266,11 @@ public class SecondaryNameNode implement
     infoServer.setAttribute("secondary.name.node", this);
     infoServer.setAttribute("name.system.image", checkpointImage);
     infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
-    infoServer.addInternalServlet("getimage", "/getimage",
-                                  GetImageServlet.class, true);
+    infoServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
+        ImageServlet.class, true);
     infoServer.start();
 
     LOG.info("Web server init done");
-    imageListenURL = new URL(DFSUtil.getHttpClientScheme(conf) + "://"
-        + NetUtils.getHostPortString(infoServer.getConnectorAddress(0)));
 
     HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
     int connIdx = 0;
@@ -487,14 +484,6 @@ public class SecondaryNameNode implement
     LOG.debug("Will connect to NameNode at " + address);
     return address.toURL();
   }
-  
-  /**
-   * Return the host:port of where this SecondaryNameNode is listening
-   * for image transfers
-   */
-  private URL getImageListenAddress() {
-    return imageListenURL;
-  }
 
   /**
    * Create a new checkpoint
@@ -555,8 +544,8 @@ public class SecondaryNameNode implement
     // to make this new uploaded image as the most current image.
     //
     long txid = checkpointImage.getLastAppliedTxId();
-    TransferFsImage.uploadImageFromStorage(fsName, getImageListenAddress(),
-        dstStorage, NameNodeFile.IMAGE, txid);
+    TransferFsImage.uploadImageFromStorage(fsName, conf, dstStorage,
+        NameNodeFile.IMAGE, txid);
 
     // error simulation code for junit test
     CheckpointFaultInjector.getInstance().afterSecondaryUploadsNewImage();

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java?rev=1575457&r1=1575456&r2=1575457&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java Sat Mar  8 00:39:23 2014
@@ -19,18 +19,22 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.HttpURLConnection;
+import java.net.URISyntaxException;
 import java.net.URL;
 import java.security.DigestInputStream;
 import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 
-import javax.servlet.ServletOutputStream;
-import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.commons.logging.Log;
@@ -49,10 +53,12 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.util.Time;
+import org.apache.http.client.utils.URIBuilder;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -65,7 +71,12 @@ import com.google.common.collect.Lists;
 public class TransferFsImage {
   
   public final static String CONTENT_LENGTH = "Content-Length";
+  public final static String FILE_LENGTH = "File-Length";
   public final static String MD5_HEADER = "X-MD5-Digest";
+
+  private final static String CONTENT_TYPE = "Content-Type";
+  private final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding";
+
   @VisibleForTesting
   static int timeout = 0;
   private static URLConnectionFactory connectionFactory;
@@ -82,14 +93,14 @@ public class TransferFsImage {
   
   public static void downloadMostRecentImageToDirectory(URL infoServer,
       File dir) throws IOException {
-    String fileId = GetImageServlet.getParamStringForMostRecentImage();
+    String fileId = ImageServlet.getParamStringForMostRecentImage();
     getFileClient(infoServer, fileId, Lists.newArrayList(dir),
         null, false);
   }
 
   public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId,
       Storage dstStorage, boolean needDigest) throws IOException {
-    String fileid = GetImageServlet.getParamStringForImage(null,
+    String fileid = ImageServlet.getParamStringForImage(null,
         imageTxId, dstStorage);
     String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
     
@@ -104,12 +115,31 @@ public class TransferFsImage {
         dstFiles.get(0).length() + " bytes.");
     return hash;
   }
-  
+
+  static MD5Hash handleUploadImageRequest(HttpServletRequest request,
+      long imageTxId, Storage dstStorage, InputStream stream,
+      long advertisedSize, DataTransferThrottler throttler) throws IOException {
+
+    String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
+
+    List<File> dstFiles = dstStorage.getFiles(NameNodeDirType.IMAGE, fileName);
+    if (dstFiles.isEmpty()) {
+      throw new IOException("No targets in destination storage!");
+    }
+
+    MD5Hash advertisedDigest = parseMD5Header(request);
+    MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true,
+        advertisedSize, advertisedDigest, fileName, stream, throttler);
+    LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size "
+        + dstFiles.get(0).length() + " bytes.");
+    return hash;
+  }
+
   static void downloadEditsToStorage(URL fsName, RemoteEditLog log,
       NNStorage dstStorage) throws IOException {
     assert log.getStartTxId() > 0 && log.getEndTxId() > 0 :
       "bad log: " + log;
-    String fileid = GetImageServlet.getParamStringForLog(
+    String fileid = ImageServlet.getParamStringForLog(
         log, dstStorage);
     String finalFileName = NNStorage.getFinalizedEditsFileName(
         log.getStartTxId(), log.getEndTxId());
@@ -159,22 +189,19 @@ public class TransferFsImage {
    * Requests that the NameNode download an image from this node.
    *
    * @param fsName the http address for the remote NN
-   * @param myNNAddress the host/port where the local node is running an
-   *                           HTTPServer hosting GetImageServlet
+   * @param conf Configuration
    * @param storage the storage directory to transfer the image from
    * @param nnf the NameNodeFile type of the image
    * @param txid the transaction ID of the image to be uploaded
    */
-  public static void uploadImageFromStorage(URL fsName, URL myNNAddress,
-      Storage storage, NameNodeFile nnf, long txid) throws IOException {
+  public static void uploadImageFromStorage(URL fsName, Configuration conf,
+      NNStorage storage, NameNodeFile nnf, long txid) throws IOException {
     
-    String fileid = GetImageServlet.getParamStringToPutImage(nnf, txid,
-        myNNAddress, storage);
-    // this doesn't directly upload an image, but rather asks the NN
-    // to connect back to the 2NN to download the specified image.
+    URL url = new URL(fsName, ImageServlet.PATH_SPEC);
+    long startTime = Time.monotonicNow();
     try {
-      TransferFsImage.getFileClient(fsName, fileid, null, null, false);
-    } catch (HttpGetFailedException e) {
+      uploadImage(url, conf, storage, nnf, txid);
+    } catch (HttpPutFailedException e) {
       if (e.getResponseCode() == HttpServletResponse.SC_CONFLICT) {
         // this is OK - this means that a previous attempt to upload
         // this checkpoint succeeded even though we thought it failed.
@@ -186,25 +213,105 @@ public class TransferFsImage {
         throw e;
       }
     }
-    LOG.info("Uploaded image with txid " + txid + " to namenode at " +
-    		fsName);
+    double xferSec = Math.max(
+        ((float) (Time.monotonicNow() - startTime)) / 1000.0, 0.001);
+    LOG.info("Uploaded image with txid " + txid + " to namenode at " + fsName
+        + " in " + xferSec + " seconds");
+  }
+
+  /*
+   * Uploads the imagefile using HTTP PUT method
+   */
+  private static void uploadImage(URL url, Configuration conf,
+      NNStorage storage, NameNodeFile nnf, long txId) throws IOException {
+
+    File imageFile = storage.findImageFile(nnf, txId);
+    if (imageFile == null) {
+      throw new IOException("Could not find image with txid " + txId);
+    }
+
+    HttpURLConnection connection = null;
+    try {
+      URIBuilder uriBuilder = new URIBuilder(url.toURI());
+
+      // write all params for image upload request as query itself.
+      // Request body contains the image to be uploaded.
+      Map<String, String> params = ImageServlet.getParamsForPutImage(storage,
+          txId, imageFile.length(), nnf);
+      for (Entry<String, String> entry : params.entrySet()) {
+        uriBuilder.addParameter(entry.getKey(), entry.getValue());
+      }
+
+      URL urlWithParams = uriBuilder.build().toURL();
+      connection = (HttpURLConnection) connectionFactory.openConnection(
+          urlWithParams, UserGroupInformation.isSecurityEnabled());
+      // Set the request to PUT
+      connection.setRequestMethod("PUT");
+      connection.setDoOutput(true);
+
+      
+      int chunkSize = conf.getInt(
+          DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY,
+          DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT);
+      if (imageFile.length() > chunkSize) {
+        // using chunked streaming mode to support upload of 2GB+ files and to
+        // avoid internal buffering.
+        // this mode should be used only if more than chunkSize data is present
+        // to upload. otherwise upload may not happen sometimes.
+        connection.setChunkedStreamingMode(chunkSize);
+      }
+
+      setTimeout(connection);
+
+      // set headers for verification
+      ImageServlet.setVerificationHeadersForPut(connection, imageFile);
+
+      // Write the file to output stream.
+      writeFileToPutRequest(conf, connection, imageFile);
+
+      int responseCode = connection.getResponseCode();
+      if (responseCode != HttpURLConnection.HTTP_OK) {
+        throw new HttpPutFailedException(connection.getResponseMessage(),
+            responseCode);
+      }
+    } catch (AuthenticationException e) {
+      throw new IOException(e);
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    } finally {
+      if (connection != null) {
+        connection.disconnect();
+      }
+    }
+  }
+
+  private static void writeFileToPutRequest(Configuration conf,
+      HttpURLConnection connection, File imageFile)
+      throws FileNotFoundException, IOException {
+    connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream");
+    connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary");
+    OutputStream output = connection.getOutputStream();
+    FileInputStream input = new FileInputStream(imageFile);
+    try {
+      copyFileToStream(output, imageFile, input,
+          ImageServlet.getThrottler(conf));
+    } finally {
+      IOUtils.closeStream(input);
+      IOUtils.closeStream(output);
+    }
   }
 
-  
   /**
    * A server-side method to respond to a getfile http request
    * Copies the contents of the local file into the output stream.
    */
-  public static void getFileServer(ServletResponse response, File localfile,
-      FileInputStream infile,
-      DataTransferThrottler throttler) 
+  public static void copyFileToStream(OutputStream out, File localfile,
+      FileInputStream infile, DataTransferThrottler throttler)
     throws IOException {
     byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
-    ServletOutputStream out = null;
     try {
       CheckpointFaultInjector.getInstance()
           .aboutToSendFile(localfile);
-      out = response.getOutputStream();
 
       if (CheckpointFaultInjector.getInstance().
             shouldSendShortFile(localfile)) {
@@ -250,14 +357,13 @@ public class TransferFsImage {
   static MD5Hash getFileClient(URL infoServer,
       String queryString, List<File> localPaths,
       Storage dstStorage, boolean getChecksum) throws IOException {
-    URL url = new URL(infoServer, "/getimage?" + queryString);
+    URL url = new URL(infoServer, ImageServlet.PATH_SPEC + "?" + queryString);
     LOG.info("Opening connection to " + url);
     return doGetUrl(url, localPaths, dstStorage, getChecksum);
   }
   
   public static MD5Hash doGetUrl(URL url, List<File> localPaths,
       Storage dstStorage, boolean getChecksum) throws IOException {
-    long startTime = Time.monotonicNow();
     HttpURLConnection connection;
     try {
       connection = (HttpURLConnection)
@@ -266,16 +372,7 @@ public class TransferFsImage {
       throw new IOException(e);
     }
 
-    if (timeout <= 0) {
-      Configuration conf = new HdfsConfiguration();
-      timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY,
-          DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT);
-    }
-
-    if (timeout > 0) {
-      connection.setConnectTimeout(timeout);
-      connection.setReadTimeout(timeout);
-    }
+    setTimeout(connection);
 
     if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
       throw new HttpGetFailedException(
@@ -293,10 +390,37 @@ public class TransferFsImage {
       throw new IOException(CONTENT_LENGTH + " header is not provided " +
                             "by the namenode when trying to fetch " + url);
     }
-    
+    MD5Hash advertisedDigest = parseMD5Header(connection);
+    String fsImageName = connection
+        .getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER);
+    InputStream stream = connection.getInputStream();
+
+    return receiveFile(url.toExternalForm(), localPaths, dstStorage,
+        getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
+        null);
+  }
+
+  private static void setTimeout(HttpURLConnection connection) {
+    if (timeout <= 0) {
+      Configuration conf = new HdfsConfiguration();
+      timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY,
+          DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT);
+      LOG.info("Image Transfer timeout configured to " + timeout
+          + " milliseconds");
+    }
+
+    if (timeout > 0) {
+      connection.setConnectTimeout(timeout);
+      connection.setReadTimeout(timeout);
+    }
+  }
+
+  private static MD5Hash receiveFile(String url, List<File> localPaths,
+      Storage dstStorage, boolean getChecksum, long advertisedSize,
+      MD5Hash advertisedDigest, String fsImageName, InputStream stream,
+      DataTransferThrottler throttler) throws IOException {
+    long startTime = Time.monotonicNow();
     if (localPaths != null) {
-      String fsImageName = connection.getHeaderField(
-          GetImageServlet.HADOOP_IMAGE_EDITS_HEADER);
       // If the local paths refer to directories, use the server-provided header
       // as the filename within that directory
       List<File> newLocalPaths = new ArrayList<File>();
@@ -313,10 +437,8 @@ public class TransferFsImage {
       localPaths = newLocalPaths;
     }
     
-    MD5Hash advertisedDigest = parseMD5Header(connection);
 
     long received = 0;
-    InputStream stream = connection.getInputStream();
     MessageDigest digester = null;
     if (getChecksum) {
       digester = MD5Hash.getDigester();
@@ -361,6 +483,9 @@ public class TransferFsImage {
           for (FileOutputStream fos : outputStreams) {
             fos.write(buf, 0, num);
           }
+          if (throttler != null) {
+            throttler.throttle(num);
+          }
         }
       }
       finishedReceiving = true;
@@ -404,7 +529,12 @@ public class TransferFsImage {
     String header = connection.getHeaderField(MD5_HEADER);
     return (header != null) ? new MD5Hash(header) : null;
   }
-  
+
+  private static MD5Hash parseMD5Header(HttpServletRequest request) {
+    String header = request.getHeader(MD5_HEADER);
+    return (header != null) ? new MD5Hash(header) : null;
+  }
+
   public static class HttpGetFailedException extends IOException {
     private static final long serialVersionUID = 1L;
     private final int responseCode;
@@ -419,4 +549,18 @@ public class TransferFsImage {
     }
   }
 
+  public static class HttpPutFailedException extends IOException {
+    private static final long serialVersionUID = 1L;
+    private final int responseCode;
+
+    HttpPutFailedException(String msg, int responseCode) throws IOException {
+      super(msg);
+      this.responseCode = responseCode;
+    }
+
+    public int getResponseCode() {
+      return responseCode;
+    }
+  }
+
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java?rev=1575457&r1=1575456&r2=1575457&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java Sat Mar  8 00:39:23 2014
@@ -63,6 +63,7 @@ public class StandbyCheckpointer {
   private static final Log LOG = LogFactory.getLog(StandbyCheckpointer.class);
   private static final long PREVENT_AFTER_CANCEL_MS = 2*60*1000L;
   private final CheckpointConf checkpointConf;
+  private final Configuration conf;
   private final FSNamesystem namesystem;
   private long lastCheckpointTime;
   private final CheckpointerThread thread;
@@ -80,6 +81,7 @@ public class StandbyCheckpointer {
   public StandbyCheckpointer(Configuration conf, FSNamesystem ns)
       throws IOException {
     this.namesystem = ns;
+    this.conf = conf;
     this.checkpointConf = new CheckpointConf(conf); 
     this.thread = new CheckpointerThread();
     this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true)
@@ -193,7 +195,7 @@ public class StandbyCheckpointer {
     Future<Void> upload = executor.submit(new Callable<Void>() {
       @Override
       public Void call() throws IOException {
-        TransferFsImage.uploadImageFromStorage(activeNNAddress, myNNAddress,
+        TransferFsImage.uploadImageFromStorage(activeNNAddress, conf,
             namesystem.getFSImage().getStorage(), imageType, txid);
         return null;
       }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1575457&r1=1575456&r2=1575457&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Sat Mar  8 00:39:23 2014
@@ -858,15 +858,13 @@
 
 <property>
   <name>dfs.image.transfer.timeout</name>
-  <value>600000</value>
+  <value>60000</value>
   <description>
-        Timeout for image transfer in milliseconds. This timeout and the related
+        Socket timeout for image transfer in milliseconds. This timeout and the related
         dfs.image.transfer.bandwidthPerSec parameter should be configured such
-        that normal image transfer can complete within the timeout.
+        that normal image transfer can complete successfully.
         This timeout prevents client hangs when the sender fails during
-        image transfer, which is particularly important during checkpointing.
-        Note that this timeout applies to the entirety of image transfer, and
-        is not a socket timeout.
+        image transfer. This is socket timeout during image tranfer.
   </description>
 </property>
 
@@ -884,6 +882,16 @@
 </property>
 
 <property>
+  <name>dfs.image.transfer.chunksize</name>
+  <value>65536</value>
+  <description>
+        Chunksize in bytes to upload the checkpoint.
+        Chunked streaming is used to avoid internal buffering of contents
+        of image file of huge size.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.support.allow.format</name>
   <value>true</value>
   <description>Does HDFS namenode allow itself to be formatted?

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java?rev=1575457&r1=1575456&r2=1575457&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java Sat Mar  8 00:39:23 2014
@@ -31,6 +31,7 @@ import static org.junit.Assert.assertTru
 import static org.junit.Assert.fail;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.RandomAccessFile;
@@ -554,23 +555,9 @@ public class TestCheckpoint {
   }
 
   /**
-   * Simulate a secondary node failure to transfer image
-   * back to the name-node.
-   * Used to truncate primary fsimage file.
-   */
-  @Test
-  public void testSecondaryFailsToReturnImage() throws IOException {
-    Mockito.doThrow(new IOException("If this exception is not caught by the " +
-        "name-node, fs image will be truncated."))
-        .when(faultInjector).aboutToSendFile(filePathContaining("secondary"));
-
-    doSecondaryFailsToReturnImage();
-  }
-  
-  /**
-   * Similar to above test, but uses an unchecked Error, and causes it
-   * before even setting the length header. This used to cause image
-   * truncation. Regression test for HDFS-3330.
+   * Simulate a secondary node failure to transfer image. Uses an unchecked
+   * error and fail transfer before even setting the length header. This used to
+   * cause image truncation. Regression test for HDFS-3330.
    */
   @Test
   public void testSecondaryFailsWithErrorBeforeSettingHeaders()
@@ -1975,7 +1962,14 @@ public class TestCheckpoint {
       Mockito.doReturn(Lists.newArrayList(new File("/wont-be-written")))
         .when(dstImage).getFiles(
             Mockito.<NameNodeDirType>anyObject(), Mockito.anyString());
-      
+
+      File mockImageFile = File.createTempFile("image", "");
+      FileOutputStream imageFile = new FileOutputStream(mockImageFile);
+      imageFile.write("data".getBytes());
+      imageFile.close();
+      Mockito.doReturn(mockImageFile).when(dstImage)
+          .findImageFile(Mockito.any(NameNodeFile.class), Mockito.anyLong());
+
       Mockito.doReturn(new StorageInfo(1, 1, "X", 1, NodeType.NAME_NODE).toColonSeparatedString())
         .when(dstImage).toColonSeparatedString();
 
@@ -1996,8 +1990,8 @@ public class TestCheckpoint {
       }
 
       try {
-        TransferFsImage.uploadImageFromStorage(fsName, new URL(
-            "http://localhost:1234"), dstImage, NameNodeFile.IMAGE, 0);
+        TransferFsImage.uploadImageFromStorage(fsName, conf, dstImage,
+            NameNodeFile.IMAGE, 0);
         fail("Storage info was not verified");
       } catch (IOException ioe) {
         String msg = StringUtils.stringifyException(ioe);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetImageServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetImageServlet.java?rev=1575457&r1=1575456&r2=1575457&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetImageServlet.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetImageServlet.java Sat Mar  8 00:39:23 2014
@@ -69,7 +69,7 @@ public class TestGetImageServlet {
     Mockito.when(context.getAttribute(HttpServer2.ADMINS_ACL)).thenReturn(acls);
     
     // Make sure that NN2 is considered a valid fsimage/edits requestor.
-    assertTrue(GetImageServlet.isValidRequestor(context,
+    assertTrue(ImageServlet.isValidRequestor(context,
         "hdfs/host2@TEST-REALM.COM", conf));
     
     // Mark atm as an admin.
@@ -81,15 +81,15 @@ public class TestGetImageServlet {
     }))).thenReturn(true);
     
     // Make sure that NN2 is still considered a valid requestor.
-    assertTrue(GetImageServlet.isValidRequestor(context,
+    assertTrue(ImageServlet.isValidRequestor(context,
         "hdfs/host2@TEST-REALM.COM", conf));
     
     // Make sure an admin is considered a valid requestor.
-    assertTrue(GetImageServlet.isValidRequestor(context,
+    assertTrue(ImageServlet.isValidRequestor(context,
         "atm@TEST-REALM.COM", conf));
     
     // Make sure other users are *not* considered valid requestors.
-    assertFalse(GetImageServlet.isValidRequestor(context,
+    assertFalse(ImageServlet.isValidRequestor(context,
         "todd@TEST-REALM.COM", conf));
   }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestTransferFsImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestTransferFsImage.java?rev=1575457&r1=1575456&r2=1575457&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestTransferFsImage.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestTransferFsImage.java Sat Mar  8 00:39:23 2014
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTru
 import static org.junit.Assert.fail;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.net.URL;
@@ -34,9 +35,11 @@ import javax.servlet.http.HttpServletReq
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.http.HttpServer2;
 import org.apache.hadoop.http.HttpServerFunctionalTest;
 import org.apache.hadoop.test.PathUtils;
@@ -118,10 +121,11 @@ public class TestTransferFsImage {
    * Test to verify the read timeout
    */
   @Test(timeout = 5000)
-  public void testImageTransferTimeout() throws Exception {
+  public void testGetImageTimeout() throws Exception {
     HttpServer2 testServer = HttpServerFunctionalTest.createServer("hdfs");
     try {
-      testServer.addServlet("GetImage", "/getimage", TestGetImageServlet.class);
+      testServer.addServlet("ImageTransfer", ImageServlet.PATH_SPEC,
+          TestImageTransferServlet.class);
       testServer.start();
       URL serverURL = HttpServerFunctionalTest.getServerURL(testServer);
       TransferFsImage.timeout = 2000;
@@ -139,7 +143,48 @@ public class TestTransferFsImage {
     }
   }
 
-  public static class TestGetImageServlet extends HttpServlet {
+  /**
+   * Test to verify the timeout of Image upload
+   */
+  @Test(timeout = 10000)
+  public void testImageUploadTimeout() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    NNStorage mockStorage = Mockito.mock(NNStorage.class);
+    HttpServer2 testServer = HttpServerFunctionalTest.createServer("hdfs");
+    try {
+      testServer.addServlet("ImageTransfer", ImageServlet.PATH_SPEC,
+          TestImageTransferServlet.class);
+      testServer.start();
+      URL serverURL = HttpServerFunctionalTest.getServerURL(testServer);
+      // set the timeout here, otherwise it will take default.
+      TransferFsImage.timeout = 2000;
+
+      File tmpDir = new File(new FileSystemTestHelper().getTestRootDir());
+      tmpDir.mkdirs();
+
+      File mockImageFile = File.createTempFile("image", "", tmpDir);
+      FileOutputStream imageFile = new FileOutputStream(mockImageFile);
+      imageFile.write("data".getBytes());
+      imageFile.close();
+      Mockito.when(
+          mockStorage.findImageFile(Mockito.any(NameNodeFile.class),
+              Mockito.anyLong())).thenReturn(mockImageFile);
+      Mockito.when(mockStorage.toColonSeparatedString()).thenReturn(
+          "storage:info:string");
+      
+      try {
+        TransferFsImage.uploadImageFromStorage(serverURL, conf, mockStorage,
+            NameNodeFile.IMAGE, 1L);
+        fail("TransferImage Should fail with timeout");
+      } catch (SocketTimeoutException e) {
+        assertEquals("Upload should timeout", "Read timed out", e.getMessage());
+      }
+    } finally {
+      testServer.stop();
+    }
+  }
+
+  public static class TestImageTransferServlet extends HttpServlet {
     private static final long serialVersionUID = 1L;
 
     @Override
@@ -153,5 +198,17 @@ public class TestTransferFsImage {
         }
       }
     }
+
+    @Override
+    protected void doPut(HttpServletRequest req, HttpServletResponse resp)
+        throws ServletException, IOException {
+      synchronized (this) {
+        try {
+          wait(5000);
+        } catch (InterruptedException e) {
+          // Ignore
+        }
+      }
+    }
   }
 }



Mime
View raw message