hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject svn commit: r1576157 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project: ./ hadoop-hdfs/ hadoop-hdfs/src/main/java/ hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/ hadoop-hdfs/s...
Date Tue, 11 Mar 2014 00:44:57 GMT
Author: wang
Date: Tue Mar 11 00:44:56 2014
New Revision: 1576157

URL: http://svn.apache.org/r1576157
Log:
HDFS-3405. Checkpointing should use HTTP POST or PUT instead of GET-GET to send merged fsimages.
Contributed by Vinayakumar B.

Added:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
      - copied unchanged from r1575611, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java
Removed:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java
Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props
changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/   (props
changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/
  (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/
  (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/
  (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/   (props
changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetImageServlet.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestTransferFsImage.java

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project:r1575611

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1575611

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1576157&r1=1576156&r2=1576157&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Mar 11
00:44:56 2014
@@ -149,6 +149,9 @@ Release 2.4.0 - UNRELEASED
     HDFS-6055. Change default configuration to limit file name length in HDFS.
     (cnauroth)
 
+    HDFS-3405. Checkpointing should use HTTP POST or PUT instead of GET-GET
+    to send merged fsimages. (Vinayakumar B via wang)
+
   OPTIMIZATIONS
 
     HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1575611

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1576157&r1=1576156&r2=1576157&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
Tue Mar 11 00:44:56 2014
@@ -462,7 +462,11 @@ public class DFSConfigKeys extends Commo
 
   // Image transfer timeout
   public static final String DFS_IMAGE_TRANSFER_TIMEOUT_KEY = "dfs.image.transfer.timeout";
-  public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 10 * 60 * 1000;
+  public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 60 * 1000;
+
+  // Image transfer chunksize
+  public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize";
+  public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024;
 
   //Keys with no defaults
   public static final String  DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java?rev=1576157&r1=1576156&r2=1576157&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java
Tue Mar 11 00:44:56 2014
@@ -42,7 +42,7 @@ import org.apache.hadoop.hdfs.qjournal.c
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
-import org.apache.hadoop.hdfs.server.namenode.GetImageServlet;
+import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
 import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode;
 import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -198,15 +198,16 @@ public class GetJournalEditServlet exten
           return;
         }
         editFile = elf.getFile();
-        GetImageServlet.setVerificationHeaders(response, editFile);
-        GetImageServlet.setFileNameHeaders(response, editFile);
+        ImageServlet.setVerificationHeadersForGet(response, editFile);
+        ImageServlet.setFileNameHeaders(response, editFile);
         editFileIn = new FileInputStream(editFile);
       }
       
-      DataTransferThrottler throttler = GetImageServlet.getThrottler(conf);
+      DataTransferThrottler throttler = ImageServlet.getThrottler(conf);
 
       // send edits
-      TransferFsImage.getFileServer(response, editFile, editFileIn, throttler);
+      TransferFsImage.copyFileToStream(response.getOutputStream(), editFile,
+          editFileIn, throttler);
 
     } catch (Throwable t) {
       String errMsg = "getedit failed. " + StringUtils.stringifyException(t);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java?rev=1576157&r1=1576156&r2=1576157&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java
Tue Mar 11 00:44:56 2014
@@ -263,8 +263,7 @@ class Checkpointer extends Daemon {
     }
 
     if(cpCmd.needToReturnImage()) {
-      TransferFsImage.uploadImageFromStorage(
-          backupNode.nnHttpAddress, getImageListenAddress(),
+      TransferFsImage.uploadImageFromStorage(backupNode.nnHttpAddress, conf,
           bnStorage, NameNodeFile.IMAGE, txid);
     }
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java?rev=1576157&r1=1576156&r2=1576157&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
Tue Mar 11 00:44:56 2014
@@ -233,8 +233,8 @@ public class NameNodeHttpServer {
         CancelDelegationTokenServlet.class, true);
     httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class,
         true);
-    httpServer.addInternalServlet("getimage", "/getimage",
-        GetImageServlet.class, true);
+    httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
+        ImageServlet.class, true);
     httpServer.addInternalServlet("listPaths", "/listPaths/*",
         ListPathsServlet.class, false);
     httpServer.addInternalServlet("data", "/data/*",

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1576157&r1=1576156&r2=1576157&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
Tue Mar 11 00:44:56 2014
@@ -114,7 +114,6 @@ public class SecondaryNameNode implement
   private InetSocketAddress nameNodeAddr;
   private volatile boolean shouldRun;
   private HttpServer2 infoServer;
-  private URL imageListenURL;
 
   private Collection<URI> checkpointDirs;
   private List<URI> checkpointEditsDirs;
@@ -267,13 +266,11 @@ public class SecondaryNameNode implement
     infoServer.setAttribute("secondary.name.node", this);
     infoServer.setAttribute("name.system.image", checkpointImage);
     infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
-    infoServer.addInternalServlet("getimage", "/getimage",
-                                  GetImageServlet.class, true);
+    infoServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
+        ImageServlet.class, true);
     infoServer.start();
 
     LOG.info("Web server init done");
-    imageListenURL = new URL(DFSUtil.getHttpClientScheme(conf) + "://"
-        + NetUtils.getHostPortString(infoServer.getConnectorAddress(0)));
 
     HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf);
     int connIdx = 0;
@@ -487,14 +484,6 @@ public class SecondaryNameNode implement
     LOG.debug("Will connect to NameNode at " + address);
     return address.toURL();
   }
-  
-  /**
-   * Return the host:port of where this SecondaryNameNode is listening
-   * for image transfers
-   */
-  private URL getImageListenAddress() {
-    return imageListenURL;
-  }
 
   /**
    * Create a new checkpoint
@@ -555,8 +544,8 @@ public class SecondaryNameNode implement
     // to make this new uploaded image as the most current image.
     //
     long txid = checkpointImage.getLastAppliedTxId();
-    TransferFsImage.uploadImageFromStorage(fsName, getImageListenAddress(),
-        dstStorage, NameNodeFile.IMAGE, txid);
+    TransferFsImage.uploadImageFromStorage(fsName, conf, dstStorage,
+        NameNodeFile.IMAGE, txid);
 
     // error simulation code for junit test
     CheckpointFaultInjector.getInstance().afterSecondaryUploadsNewImage();

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java?rev=1576157&r1=1576156&r2=1576157&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
Tue Mar 11 00:44:56 2014
@@ -19,18 +19,22 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.HttpURLConnection;
+import java.net.URISyntaxException;
 import java.net.URL;
 import java.security.DigestInputStream;
 import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 
-import javax.servlet.ServletOutputStream;
-import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.commons.logging.Log;
@@ -49,10 +53,12 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.util.Time;
+import org.apache.http.client.utils.URIBuilder;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -65,7 +71,12 @@ import com.google.common.collect.Lists;
 public class TransferFsImage {
   
   public final static String CONTENT_LENGTH = "Content-Length";
+  public final static String FILE_LENGTH = "File-Length";
   public final static String MD5_HEADER = "X-MD5-Digest";
+
+  private final static String CONTENT_TYPE = "Content-Type";
+  private final static String CONTENT_TRANSFER_ENCODING = "Content-Transfer-Encoding";
+
   @VisibleForTesting
   static int timeout = 0;
   private static URLConnectionFactory connectionFactory;
@@ -82,14 +93,14 @@ public class TransferFsImage {
   
   public static void downloadMostRecentImageToDirectory(URL infoServer,
       File dir) throws IOException {
-    String fileId = GetImageServlet.getParamStringForMostRecentImage();
+    String fileId = ImageServlet.getParamStringForMostRecentImage();
     getFileClient(infoServer, fileId, Lists.newArrayList(dir),
         null, false);
   }
 
   public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId,
       Storage dstStorage, boolean needDigest) throws IOException {
-    String fileid = GetImageServlet.getParamStringForImage(null,
+    String fileid = ImageServlet.getParamStringForImage(null,
         imageTxId, dstStorage);
     String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
     
@@ -104,12 +115,31 @@ public class TransferFsImage {
         dstFiles.get(0).length() + " bytes.");
     return hash;
   }
-  
+
+  static MD5Hash handleUploadImageRequest(HttpServletRequest request,
+      long imageTxId, Storage dstStorage, InputStream stream,
+      long advertisedSize, DataTransferThrottler throttler) throws IOException {
+
+    String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
+
+    List<File> dstFiles = dstStorage.getFiles(NameNodeDirType.IMAGE, fileName);
+    if (dstFiles.isEmpty()) {
+      throw new IOException("No targets in destination storage!");
+    }
+
+    MD5Hash advertisedDigest = parseMD5Header(request);
+    MD5Hash hash = receiveFile(fileName, dstFiles, dstStorage, true,
+        advertisedSize, advertisedDigest, fileName, stream, throttler);
+    LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size "
+        + dstFiles.get(0).length() + " bytes.");
+    return hash;
+  }
+
   static void downloadEditsToStorage(URL fsName, RemoteEditLog log,
       NNStorage dstStorage) throws IOException {
     assert log.getStartTxId() > 0 && log.getEndTxId() > 0 :
       "bad log: " + log;
-    String fileid = GetImageServlet.getParamStringForLog(
+    String fileid = ImageServlet.getParamStringForLog(
         log, dstStorage);
     String finalFileName = NNStorage.getFinalizedEditsFileName(
         log.getStartTxId(), log.getEndTxId());
@@ -159,22 +189,19 @@ public class TransferFsImage {
    * Requests that the NameNode download an image from this node.
    *
    * @param fsName the http address for the remote NN
-   * @param myNNAddress the host/port where the local node is running an
-   *                           HTTPServer hosting GetImageServlet
+   * @param conf Configuration
    * @param storage the storage directory to transfer the image from
    * @param nnf the NameNodeFile type of the image
    * @param txid the transaction ID of the image to be uploaded
    */
-  public static void uploadImageFromStorage(URL fsName, URL myNNAddress,
-      Storage storage, NameNodeFile nnf, long txid) throws IOException {
+  public static void uploadImageFromStorage(URL fsName, Configuration conf,
+      NNStorage storage, NameNodeFile nnf, long txid) throws IOException {
     
-    String fileid = GetImageServlet.getParamStringToPutImage(nnf, txid,
-        myNNAddress, storage);
-    // this doesn't directly upload an image, but rather asks the NN
-    // to connect back to the 2NN to download the specified image.
+    URL url = new URL(fsName, ImageServlet.PATH_SPEC);
+    long startTime = Time.monotonicNow();
     try {
-      TransferFsImage.getFileClient(fsName, fileid, null, null, false);
-    } catch (HttpGetFailedException e) {
+      uploadImage(url, conf, storage, nnf, txid);
+    } catch (HttpPutFailedException e) {
       if (e.getResponseCode() == HttpServletResponse.SC_CONFLICT) {
         // this is OK - this means that a previous attempt to upload
         // this checkpoint succeeded even though we thought it failed.
@@ -186,25 +213,105 @@ public class TransferFsImage {
         throw e;
       }
     }
-    LOG.info("Uploaded image with txid " + txid + " to namenode at " +
-    		fsName);
+    double xferSec = Math.max(
+        ((float) (Time.monotonicNow() - startTime)) / 1000.0, 0.001);
+    LOG.info("Uploaded image with txid " + txid + " to namenode at " + fsName
+        + " in " + xferSec + " seconds");
+  }
+
+  /*
+   * Uploads the imagefile using HTTP PUT method
+   */
+  private static void uploadImage(URL url, Configuration conf,
+      NNStorage storage, NameNodeFile nnf, long txId) throws IOException {
+
+    File imageFile = storage.findImageFile(nnf, txId);
+    if (imageFile == null) {
+      throw new IOException("Could not find image with txid " + txId);
+    }
+
+    HttpURLConnection connection = null;
+    try {
+      URIBuilder uriBuilder = new URIBuilder(url.toURI());
+
+      // write all params for image upload request as query itself.
+      // Request body contains the image to be uploaded.
+      Map<String, String> params = ImageServlet.getParamsForPutImage(storage,
+          txId, imageFile.length(), nnf);
+      for (Entry<String, String> entry : params.entrySet()) {
+        uriBuilder.addParameter(entry.getKey(), entry.getValue());
+      }
+
+      URL urlWithParams = uriBuilder.build().toURL();
+      connection = (HttpURLConnection) connectionFactory.openConnection(
+          urlWithParams, UserGroupInformation.isSecurityEnabled());
+      // Set the request to PUT
+      connection.setRequestMethod("PUT");
+      connection.setDoOutput(true);
+
+      
+      int chunkSize = conf.getInt(
+          DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY,
+          DFSConfigKeys.DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT);
+      if (imageFile.length() > chunkSize) {
+        // using chunked streaming mode to support upload of 2GB+ files and to
+        // avoid internal buffering.
+        // this mode should be used only if more than chunkSize data is present
+        // to upload. otherwise upload may not happen sometimes.
+        connection.setChunkedStreamingMode(chunkSize);
+      }
+
+      setTimeout(connection);
+
+      // set headers for verification
+      ImageServlet.setVerificationHeadersForPut(connection, imageFile);
+
+      // Write the file to output stream.
+      writeFileToPutRequest(conf, connection, imageFile);
+
+      int responseCode = connection.getResponseCode();
+      if (responseCode != HttpURLConnection.HTTP_OK) {
+        throw new HttpPutFailedException(connection.getResponseMessage(),
+            responseCode);
+      }
+    } catch (AuthenticationException e) {
+      throw new IOException(e);
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    } finally {
+      if (connection != null) {
+        connection.disconnect();
+      }
+    }
+  }
+
+  private static void writeFileToPutRequest(Configuration conf,
+      HttpURLConnection connection, File imageFile)
+      throws FileNotFoundException, IOException {
+    connection.setRequestProperty(CONTENT_TYPE, "application/octet-stream");
+    connection.setRequestProperty(CONTENT_TRANSFER_ENCODING, "binary");
+    OutputStream output = connection.getOutputStream();
+    FileInputStream input = new FileInputStream(imageFile);
+    try {
+      copyFileToStream(output, imageFile, input,
+          ImageServlet.getThrottler(conf));
+    } finally {
+      IOUtils.closeStream(input);
+      IOUtils.closeStream(output);
+    }
   }
 
-  
   /**
    * A server-side method to respond to a getfile http request
    * Copies the contents of the local file into the output stream.
    */
-  public static void getFileServer(ServletResponse response, File localfile,
-      FileInputStream infile,
-      DataTransferThrottler throttler) 
+  public static void copyFileToStream(OutputStream out, File localfile,
+      FileInputStream infile, DataTransferThrottler throttler)
     throws IOException {
     byte buf[] = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
-    ServletOutputStream out = null;
     try {
       CheckpointFaultInjector.getInstance()
           .aboutToSendFile(localfile);
-      out = response.getOutputStream();
 
       if (CheckpointFaultInjector.getInstance().
             shouldSendShortFile(localfile)) {
@@ -250,14 +357,13 @@ public class TransferFsImage {
   static MD5Hash getFileClient(URL infoServer,
       String queryString, List<File> localPaths,
       Storage dstStorage, boolean getChecksum) throws IOException {
-    URL url = new URL(infoServer, "/getimage?" + queryString);
+    URL url = new URL(infoServer, ImageServlet.PATH_SPEC + "?" + queryString);
     LOG.info("Opening connection to " + url);
     return doGetUrl(url, localPaths, dstStorage, getChecksum);
   }
   
   public static MD5Hash doGetUrl(URL url, List<File> localPaths,
       Storage dstStorage, boolean getChecksum) throws IOException {
-    long startTime = Time.monotonicNow();
     HttpURLConnection connection;
     try {
       connection = (HttpURLConnection)
@@ -266,16 +372,7 @@ public class TransferFsImage {
       throw new IOException(e);
     }
 
-    if (timeout <= 0) {
-      Configuration conf = new HdfsConfiguration();
-      timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY,
-          DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT);
-    }
-
-    if (timeout > 0) {
-      connection.setConnectTimeout(timeout);
-      connection.setReadTimeout(timeout);
-    }
+    setTimeout(connection);
 
     if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
       throw new HttpGetFailedException(
@@ -293,10 +390,37 @@ public class TransferFsImage {
       throw new IOException(CONTENT_LENGTH + " header is not provided " +
                             "by the namenode when trying to fetch " + url);
     }
-    
+    MD5Hash advertisedDigest = parseMD5Header(connection);
+    String fsImageName = connection
+        .getHeaderField(ImageServlet.HADOOP_IMAGE_EDITS_HEADER);
+    InputStream stream = connection.getInputStream();
+
+    return receiveFile(url.toExternalForm(), localPaths, dstStorage,
+        getChecksum, advertisedSize, advertisedDigest, fsImageName, stream,
+        null);
+  }
+
+  private static void setTimeout(HttpURLConnection connection) {
+    if (timeout <= 0) {
+      Configuration conf = new HdfsConfiguration();
+      timeout = conf.getInt(DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_KEY,
+          DFSConfigKeys.DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT);
+      LOG.info("Image Transfer timeout configured to " + timeout
+          + " milliseconds");
+    }
+
+    if (timeout > 0) {
+      connection.setConnectTimeout(timeout);
+      connection.setReadTimeout(timeout);
+    }
+  }
+
+  private static MD5Hash receiveFile(String url, List<File> localPaths,
+      Storage dstStorage, boolean getChecksum, long advertisedSize,
+      MD5Hash advertisedDigest, String fsImageName, InputStream stream,
+      DataTransferThrottler throttler) throws IOException {
+    long startTime = Time.monotonicNow();
     if (localPaths != null) {
-      String fsImageName = connection.getHeaderField(
-          GetImageServlet.HADOOP_IMAGE_EDITS_HEADER);
       // If the local paths refer to directories, use the server-provided header
       // as the filename within that directory
       List<File> newLocalPaths = new ArrayList<File>();
@@ -313,10 +437,8 @@ public class TransferFsImage {
       localPaths = newLocalPaths;
     }
     
-    MD5Hash advertisedDigest = parseMD5Header(connection);
 
     long received = 0;
-    InputStream stream = connection.getInputStream();
     MessageDigest digester = null;
     if (getChecksum) {
       digester = MD5Hash.getDigester();
@@ -361,6 +483,9 @@ public class TransferFsImage {
           for (FileOutputStream fos : outputStreams) {
             fos.write(buf, 0, num);
           }
+          if (throttler != null) {
+            throttler.throttle(num);
+          }
         }
       }
       finishedReceiving = true;
@@ -404,7 +529,12 @@ public class TransferFsImage {
     String header = connection.getHeaderField(MD5_HEADER);
     return (header != null) ? new MD5Hash(header) : null;
   }
-  
+
+  private static MD5Hash parseMD5Header(HttpServletRequest request) {
+    String header = request.getHeader(MD5_HEADER);
+    return (header != null) ? new MD5Hash(header) : null;
+  }
+
   public static class HttpGetFailedException extends IOException {
     private static final long serialVersionUID = 1L;
     private final int responseCode;
@@ -419,4 +549,18 @@ public class TransferFsImage {
     }
   }
 
+  public static class HttpPutFailedException extends IOException {
+    private static final long serialVersionUID = 1L;
+    private final int responseCode;
+
+    HttpPutFailedException(String msg, int responseCode) throws IOException {
+      super(msg);
+      this.responseCode = responseCode;
+    }
+
+    public int getResponseCode() {
+      return responseCode;
+    }
+  }
+
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java?rev=1576157&r1=1576156&r2=1576157&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
Tue Mar 11 00:44:56 2014
@@ -63,6 +63,7 @@ public class StandbyCheckpointer {
   private static final Log LOG = LogFactory.getLog(StandbyCheckpointer.class);
   private static final long PREVENT_AFTER_CANCEL_MS = 2*60*1000L;
   private final CheckpointConf checkpointConf;
+  private final Configuration conf;
   private final FSNamesystem namesystem;
   private long lastCheckpointTime;
   private final CheckpointerThread thread;
@@ -80,6 +81,7 @@ public class StandbyCheckpointer {
   public StandbyCheckpointer(Configuration conf, FSNamesystem ns)
       throws IOException {
     this.namesystem = ns;
+    this.conf = conf;
     this.checkpointConf = new CheckpointConf(conf); 
     this.thread = new CheckpointerThread();
     this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true)
@@ -193,7 +195,7 @@ public class StandbyCheckpointer {
     Future<Void> upload = executor.submit(new Callable<Void>() {
       @Override
       public Void call() throws IOException {
-        TransferFsImage.uploadImageFromStorage(activeNNAddress, myNNAddress,
+        TransferFsImage.uploadImageFromStorage(activeNNAddress, conf,
             namesystem.getFSImage().getStorage(), imageType, txid);
         return null;
       }

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1575611

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1576157&r1=1576156&r2=1576157&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
Tue Mar 11 00:44:56 2014
@@ -858,15 +858,13 @@
 
 <property>
   <name>dfs.image.transfer.timeout</name>
-  <value>600000</value>
+  <value>60000</value>
   <description>
-        Timeout for image transfer in milliseconds. This timeout and the related
+        Socket timeout for image transfer in milliseconds. This timeout and the related
         dfs.image.transfer.bandwidthPerSec parameter should be configured such
-        that normal image transfer can complete within the timeout.
+        that normal image transfer can complete successfully.
         This timeout prevents client hangs when the sender fails during
-        image transfer, which is particularly important during checkpointing.
-        Note that this timeout applies to the entirety of image transfer, and
-        is not a socket timeout.
+        image transfer. This is socket timeout during image tranfer.
   </description>
 </property>
 
@@ -884,6 +882,16 @@
 </property>
 
 <property>
+  <name>dfs.image.transfer.chunksize</name>
+  <value>65536</value>
+  <description>
+        Chunksize in bytes to upload the checkpoint.
+        Chunked streaming is used to avoid internal buffering of contents
+        of image file of huge size.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.support.allow.format</name>
   <value>true</value>
   <description>Does HDFS namenode allow itself to be formatted?

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode:r1575611

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:r1575611

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary:r1575611

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs:r1575611

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java?rev=1576157&r1=1576156&r2=1576157&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
Tue Mar 11 00:44:56 2014
@@ -31,6 +31,7 @@ import static org.junit.Assert.assertTru
 import static org.junit.Assert.fail;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.RandomAccessFile;
@@ -564,23 +565,9 @@ public class TestCheckpoint {
   }
 
   /**
-   * Simulate a secondary node failure to transfer image
-   * back to the name-node.
-   * Used to truncate primary fsimage file.
-   */
-  @Test
-  public void testSecondaryFailsToReturnImage() throws IOException {
-    Mockito.doThrow(new IOException("If this exception is not caught by the " +
-        "name-node, fs image will be truncated."))
-        .when(faultInjector).aboutToSendFile(filePathContaining("secondary"));
-
-    doSecondaryFailsToReturnImage();
-  }
-  
-  /**
-   * Similar to above test, but uses an unchecked Error, and causes it
-   * before even setting the length header. This used to cause image
-   * truncation. Regression test for HDFS-3330.
+   * Simulate a secondary node failure to transfer image. Uses an unchecked
+   * error and fail transfer before even setting the length header. This used to
+   * cause image truncation. Regression test for HDFS-3330.
    */
   @Test
   public void testSecondaryFailsWithErrorBeforeSettingHeaders()
@@ -1978,7 +1965,14 @@ public class TestCheckpoint {
       Mockito.doReturn(Lists.newArrayList(new File("/wont-be-written")))
         .when(dstImage).getFiles(
             Mockito.<NameNodeDirType>anyObject(), Mockito.anyString());
-      
+
+      File mockImageFile = File.createTempFile("image", "");
+      FileOutputStream imageFile = new FileOutputStream(mockImageFile);
+      imageFile.write("data".getBytes());
+      imageFile.close();
+      Mockito.doReturn(mockImageFile).when(dstImage)
+          .findImageFile(Mockito.any(NameNodeFile.class), Mockito.anyLong());
+
       Mockito.doReturn(new StorageInfo(1, 1, "X", 1, NodeType.NAME_NODE).toColonSeparatedString())
         .when(dstImage).toColonSeparatedString();
 
@@ -1999,8 +1993,8 @@ public class TestCheckpoint {
       }
 
       try {
-        TransferFsImage.uploadImageFromStorage(fsName, new URL(
-            "http://localhost:1234"), dstImage, NameNodeFile.IMAGE, 0);
+        TransferFsImage.uploadImageFromStorage(fsName, conf, dstImage,
+            NameNodeFile.IMAGE, 0);
         fail("Storage info was not verified");
       } catch (IOException ioe) {
         String msg = StringUtils.stringifyException(ioe);

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetImageServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetImageServlet.java?rev=1576157&r1=1576156&r2=1576157&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetImageServlet.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGetImageServlet.java
Tue Mar 11 00:44:56 2014
@@ -69,7 +69,7 @@ public class TestGetImageServlet {
     Mockito.when(context.getAttribute(HttpServer2.ADMINS_ACL)).thenReturn(acls);
     
     // Make sure that NN2 is considered a valid fsimage/edits requestor.
-    assertTrue(GetImageServlet.isValidRequestor(context,
+    assertTrue(ImageServlet.isValidRequestor(context,
         "hdfs/host2@TEST-REALM.COM", conf));
     
     // Mark atm as an admin.
@@ -81,15 +81,15 @@ public class TestGetImageServlet {
     }))).thenReturn(true);
     
     // Make sure that NN2 is still considered a valid requestor.
-    assertTrue(GetImageServlet.isValidRequestor(context,
+    assertTrue(ImageServlet.isValidRequestor(context,
         "hdfs/host2@TEST-REALM.COM", conf));
     
     // Make sure an admin is considered a valid requestor.
-    assertTrue(GetImageServlet.isValidRequestor(context,
+    assertTrue(ImageServlet.isValidRequestor(context,
         "atm@TEST-REALM.COM", conf));
     
     // Make sure other users are *not* considered valid requestors.
-    assertFalse(GetImageServlet.isValidRequestor(context,
+    assertFalse(ImageServlet.isValidRequestor(context,
         "todd@TEST-REALM.COM", conf));
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestTransferFsImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestTransferFsImage.java?rev=1576157&r1=1576156&r2=1576157&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestTransferFsImage.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestTransferFsImage.java
Tue Mar 11 00:44:56 2014
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTru
 import static org.junit.Assert.fail;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.net.URL;
@@ -34,9 +35,11 @@ import javax.servlet.http.HttpServletReq
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
 import org.apache.hadoop.http.HttpServer2;
 import org.apache.hadoop.http.HttpServerFunctionalTest;
 import org.apache.hadoop.test.PathUtils;
@@ -118,10 +121,11 @@ public class TestTransferFsImage {
    * Test to verify the read timeout
    */
   @Test(timeout = 5000)
-  public void testImageTransferTimeout() throws Exception {
+  public void testGetImageTimeout() throws Exception {
     HttpServer2 testServer = HttpServerFunctionalTest.createServer("hdfs");
     try {
-      testServer.addServlet("GetImage", "/getimage", TestGetImageServlet.class);
+      testServer.addServlet("ImageTransfer", ImageServlet.PATH_SPEC,
+          TestImageTransferServlet.class);
       testServer.start();
       URL serverURL = HttpServerFunctionalTest.getServerURL(testServer);
       TransferFsImage.timeout = 2000;
@@ -139,7 +143,48 @@ public class TestTransferFsImage {
     }
   }
 
-  public static class TestGetImageServlet extends HttpServlet {
+  /**
+   * Test to verify the timeout of Image upload
+   */
+  @Test(timeout = 10000)
+  public void testImageUploadTimeout() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    NNStorage mockStorage = Mockito.mock(NNStorage.class);
+    HttpServer2 testServer = HttpServerFunctionalTest.createServer("hdfs");
+    try {
+      testServer.addServlet("ImageTransfer", ImageServlet.PATH_SPEC,
+          TestImageTransferServlet.class);
+      testServer.start();
+      URL serverURL = HttpServerFunctionalTest.getServerURL(testServer);
+      // set the timeout here, otherwise it will take default.
+      TransferFsImage.timeout = 2000;
+
+      File tmpDir = new File(new FileSystemTestHelper().getTestRootDir());
+      tmpDir.mkdirs();
+
+      File mockImageFile = File.createTempFile("image", "", tmpDir);
+      FileOutputStream imageFile = new FileOutputStream(mockImageFile);
+      imageFile.write("data".getBytes());
+      imageFile.close();
+      Mockito.when(
+          mockStorage.findImageFile(Mockito.any(NameNodeFile.class),
+              Mockito.anyLong())).thenReturn(mockImageFile);
+      Mockito.when(mockStorage.toColonSeparatedString()).thenReturn(
+          "storage:info:string");
+      
+      try {
+        TransferFsImage.uploadImageFromStorage(serverURL, conf, mockStorage,
+            NameNodeFile.IMAGE, 1L);
+        fail("TransferImage Should fail with timeout");
+      } catch (SocketTimeoutException e) {
+        assertEquals("Upload should timeout", "Read timed out", e.getMessage());
+      }
+    } finally {
+      testServer.stop();
+    }
+  }
+
+  public static class TestImageTransferServlet extends HttpServlet {
     private static final long serialVersionUID = 1L;
 
     @Override
@@ -153,5 +198,17 @@ public class TestTransferFsImage {
         }
       }
     }
+
+    @Override
+    protected void doPut(HttpServletRequest req, HttpServletResponse resp)
+        throws ServletException, IOException {
+      synchronized (this) {
+        try {
+          wait(5000);
+        } catch (InterruptedException e) {
+          // Ignore
+        }
+      }
+    }
   }
 }



Mime
View raw message