hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1399950 [6/27] - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project: ./ hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/dev-support/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apach...
Date Fri, 19 Oct 2012 02:28:07 GMT
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperConfiguration.java Fri Oct 19 02:25:55 2012
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.Random;
 
 import org.apache.bookkeeper.util.LocalBookKeeper;
 import org.apache.commons.logging.Log;
@@ -42,6 +43,8 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+
 public class TestBookKeeperConfiguration {
   private static final Log LOG = LogFactory
       .getLog(TestBookKeeperConfiguration.class);
@@ -73,6 +76,11 @@ public class TestBookKeeperConfiguration
     return zkc;
   }
 
+  private NamespaceInfo newNSInfo() {
+    Random r = new Random();
+    return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1);
+  }
+
   @BeforeClass
   public static void setupZooKeeper() throws Exception {
     // create a ZooKeeper server(dataDir, dataLogDir, port)
@@ -137,8 +145,10 @@ public class TestBookKeeperConfiguration
         bkAvailablePath);
     Assert.assertNull(bkAvailablePath + " already exists", zkc.exists(
         bkAvailablePath, false));
-    bkjm = new BookKeeperJournalManager(conf, URI.create("bookkeeper://"
-        + HOSTPORT + "/hdfsjournal-WithBKPath"));
+    NamespaceInfo nsi = newNSInfo();
+    bkjm = new BookKeeperJournalManager(conf,
+        URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-WithBKPath"),
+        nsi);
     Assert.assertNotNull("Bookie available path : " + bkAvailablePath
         + " doesn't exists", zkc.exists(bkAvailablePath, false));
   }
@@ -152,8 +162,10 @@ public class TestBookKeeperConfiguration
     Configuration conf = new Configuration();
     Assert.assertNull(BK_ROOT_PATH + " already exists", zkc.exists(
         BK_ROOT_PATH, false));
-    new BookKeeperJournalManager(conf, URI.create("bookkeeper://" + HOSTPORT
-        + "/hdfsjournal-DefaultBKPath"));
+    NamespaceInfo nsi = newNSInfo();
+    bkjm = new BookKeeperJournalManager(conf,
+        URI.create("bookkeeper://" + HOSTPORT + "/hdfsjournal-DefaultBKPath"),
+        nsi);
     Assert.assertNotNull("Bookie available path : " + BK_ROOT_PATH
         + " doesn't exists", zkc.exists(BK_ROOT_PATH, false));
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperJournalManager.java Fri Oct 19 02:25:55 2012
@@ -29,6 +29,7 @@ import org.mockito.Mockito;
 import java.io.IOException;
 import java.net.URI;
 import java.util.List;
+import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -37,6 +38,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
 import org.apache.hadoop.hdfs.server.namenode.JournalManager;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.zookeeper.CreateMode;
@@ -78,10 +80,17 @@ public class TestBookKeeperJournalManage
     zkc.close();
   }
 
+  private NamespaceInfo newNSInfo() {
+    Random r = new Random();
+    return new NamespaceInfo(r.nextInt(), "testCluster", "TestBPID", -1);
+  }
+
   @Test
   public void testSimpleWrite() throws Exception {
+    NamespaceInfo nsi = newNSInfo();
     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-        BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"));
+        BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi);
+
     EditLogOutputStream out = bkjm.startLogSegment(1);
     for (long i = 1 ; i <= 100; i++) {
       FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@@ -99,8 +108,10 @@ public class TestBookKeeperJournalManage
 
   @Test
   public void testNumberOfTransactions() throws Exception {
+    NamespaceInfo nsi = newNSInfo();
+
     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-        BKJMUtil.createJournalURI("/hdfsjournal-txncount"));
+        BKJMUtil.createJournalURI("/hdfsjournal-txncount"), nsi);
     EditLogOutputStream out = bkjm.startLogSegment(1);
     for (long i = 1 ; i <= 100; i++) {
       FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@@ -116,8 +127,10 @@ public class TestBookKeeperJournalManage
 
   @Test 
   public void testNumberOfTransactionsWithGaps() throws Exception {
+    NamespaceInfo nsi = newNSInfo();
     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-        BKJMUtil.createJournalURI("/hdfsjournal-gaps"));
+        BKJMUtil.createJournalURI("/hdfsjournal-gaps"), nsi);
+
     long txid = 1;
     for (long i = 0; i < 3; i++) {
       long start = txid;
@@ -151,8 +164,10 @@ public class TestBookKeeperJournalManage
 
   @Test
   public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception {
+    NamespaceInfo nsi = newNSInfo();
     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-        BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd"));
+        BKJMUtil.createJournalURI("/hdfsjournal-inprogressAtEnd"), nsi);
+
     long txid = 1;
     for (long i = 0; i < 3; i++) {
       long start = txid;
@@ -190,8 +205,10 @@ public class TestBookKeeperJournalManage
    */
   @Test
   public void testWriteRestartFrom1() throws Exception {
+    NamespaceInfo nsi = newNSInfo();
     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-        BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1"));
+        BKJMUtil.createJournalURI("/hdfsjournal-restartFrom1"), nsi);
+
     long txid = 1;
     long start = txid;
     EditLogOutputStream out = bkjm.startLogSegment(txid);
@@ -245,11 +262,15 @@ public class TestBookKeeperJournalManage
   @Test
   public void testTwoWriters() throws Exception {
     long start = 1;
+    NamespaceInfo nsi = newNSInfo();
+
     BookKeeperJournalManager bkjm1 = new BookKeeperJournalManager(conf,
-        BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"));
+        BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi);
+
     BookKeeperJournalManager bkjm2 = new BookKeeperJournalManager(conf,
-        BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"));
-    
+        BKJMUtil.createJournalURI("/hdfsjournal-dualWriter"), nsi);
+
+
     EditLogOutputStream out1 = bkjm1.startLogSegment(start);
     try {
       bkjm2.startLogSegment(start);
@@ -263,8 +284,11 @@ public class TestBookKeeperJournalManage
 
   @Test
   public void testSimpleRead() throws Exception {
+    NamespaceInfo nsi = newNSInfo();
     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-        BKJMUtil.createJournalURI("/hdfsjournal-simpleread"));
+        BKJMUtil.createJournalURI("/hdfsjournal-simpleread"),
+        nsi);
+
     final long numTransactions = 10000;
     EditLogOutputStream out = bkjm.startLogSegment(1);
     for (long i = 1 ; i <= numTransactions; i++) {
@@ -287,8 +311,11 @@ public class TestBookKeeperJournalManage
 
   @Test
   public void testSimpleRecovery() throws Exception {
+    NamespaceInfo nsi = newNSInfo();
     BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-        BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"));
+        BKJMUtil.createJournalURI("/hdfsjournal-simplerecovery"),
+        nsi);
+
     EditLogOutputStream out = bkjm.startLogSegment(1);
     for (long i = 1 ; i <= 100; i++) {
       FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@@ -334,8 +361,10 @@ public class TestBookKeeperJournalManage
       conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
                   ensembleSize);
       long txid = 1;
+      NamespaceInfo nsi = newNSInfo();
       BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-          BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure"));
+          BKJMUtil.createJournalURI("/hdfsjournal-allbookiefailure"),
+          nsi);
       EditLogOutputStream out = bkjm.startLogSegment(txid);
 
       for (long i = 1 ; i <= 3; i++) {
@@ -416,8 +445,12 @@ public class TestBookKeeperJournalManage
       conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_QUORUM_SIZE,
                   ensembleSize);
       long txid = 1;
+
+      NamespaceInfo nsi = newNSInfo();
       BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
-          BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"));
+          BKJMUtil.createJournalURI("/hdfsjournal-onebookiefailure"),
+          nsi);
+
       EditLogOutputStream out = bkjm.startLogSegment(txid);
       for (long i = 1 ; i <= 3; i++) {
         FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
@@ -464,7 +497,9 @@ public class TestBookKeeperJournalManage
   @Test
   public void testEmptyInprogressNode() throws Exception {
     URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogress");
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+    NamespaceInfo nsi = newNSInfo();
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
+                                                                 nsi);
 
     EditLogOutputStream out = bkjm.startLogSegment(1);
     for (long i = 1; i <= 100; i++) {
@@ -481,7 +516,7 @@ public class TestBookKeeperJournalManage
     String inprogressZNode = bkjm.inprogressZNode(101);
     zkc.setData(inprogressZNode, new byte[0], -1);
 
-    bkjm = new BookKeeperJournalManager(conf, uri);
+    bkjm = new BookKeeperJournalManager(conf, uri, nsi);
     try {
       bkjm.recoverUnfinalizedSegments();
       fail("Should have failed. There should be no way of creating"
@@ -489,7 +524,7 @@ public class TestBookKeeperJournalManage
     } catch (IOException e) {
       // correct behaviour
       assertTrue("Exception different than expected", e.getMessage().contains(
-          "Invalid ledger entry,"));
+          "Invalid/Incomplete data in znode"));
     } finally {
       bkjm.close();
     }
@@ -503,7 +538,9 @@ public class TestBookKeeperJournalManage
   @Test
   public void testCorruptInprogressNode() throws Exception {
     URI uri = BKJMUtil.createJournalURI("/hdfsjournal-corruptInprogress");
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+    NamespaceInfo nsi = newNSInfo();
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
+                                                                 nsi);
 
     EditLogOutputStream out = bkjm.startLogSegment(1);
     for (long i = 1; i <= 100; i++) {
@@ -521,7 +558,7 @@ public class TestBookKeeperJournalManage
     String inprogressZNode = bkjm.inprogressZNode(101);
     zkc.setData(inprogressZNode, "WholeLottaJunk".getBytes(), -1);
 
-    bkjm = new BookKeeperJournalManager(conf, uri);
+    bkjm = new BookKeeperJournalManager(conf, uri, nsi);
     try {
       bkjm.recoverUnfinalizedSegments();
       fail("Should have failed. There should be no way of creating"
@@ -529,8 +566,7 @@ public class TestBookKeeperJournalManage
     } catch (IOException e) {
       // correct behaviour
       assertTrue("Exception different than expected", e.getMessage().contains(
-          "Invalid ledger entry,"));
-
+          "has no field named"));
     } finally {
       bkjm.close();
     }
@@ -544,7 +580,9 @@ public class TestBookKeeperJournalManage
   @Test
   public void testEmptyInprogressLedger() throws Exception {
     URI uri = BKJMUtil.createJournalURI("/hdfsjournal-emptyInprogressLedger");
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+    NamespaceInfo nsi = newNSInfo();
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
+                                                                 nsi);
 
     EditLogOutputStream out = bkjm.startLogSegment(1);
     for (long i = 1; i <= 100; i++) {
@@ -559,7 +597,7 @@ public class TestBookKeeperJournalManage
     out.close();
     bkjm.close();
 
-    bkjm = new BookKeeperJournalManager(conf, uri);
+    bkjm = new BookKeeperJournalManager(conf, uri, nsi);
     bkjm.recoverUnfinalizedSegments();
     out = bkjm.startLogSegment(101);
     for (long i = 1; i <= 100; i++) {
@@ -581,7 +619,9 @@ public class TestBookKeeperJournalManage
   public void testRefinalizeAlreadyFinalizedInprogress() throws Exception {
     URI uri = BKJMUtil
         .createJournalURI("/hdfsjournal-refinalizeInprogressLedger");
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+    NamespaceInfo nsi = newNSInfo();
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
+                                                                 nsi);
 
     EditLogOutputStream out = bkjm.startLogSegment(1);
     for (long i = 1; i <= 100; i++) {
@@ -601,7 +641,7 @@ public class TestBookKeeperJournalManage
     byte[] inprogressData = zkc.getData(inprogressZNode, false, null);
 
     // finalize
-    bkjm = new BookKeeperJournalManager(conf, uri);
+    bkjm = new BookKeeperJournalManager(conf, uri, nsi);
     bkjm.recoverUnfinalizedSegments();
     bkjm.close();
 
@@ -613,7 +653,7 @@ public class TestBookKeeperJournalManage
         CreateMode.PERSISTENT);
 
     // should work fine
-    bkjm = new BookKeeperJournalManager(conf, uri);
+    bkjm = new BookKeeperJournalManager(conf, uri, nsi);
     bkjm.recoverUnfinalizedSegments();
     bkjm.close();
   }
@@ -626,7 +666,10 @@ public class TestBookKeeperJournalManage
   @Test
   public void testEditLogFileNotExistsWhenReadingMetadata() throws Exception {
     URI uri = BKJMUtil.createJournalURI("/hdfsjournal-editlogfile");
-    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri);
+    NamespaceInfo nsi = newNSInfo();
+    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, uri,
+                                                                 nsi);
+
     try {
       // start new inprogress log segment with txid=1
       // and write transactions till txid=50

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs Fri Oct 19 02:25:55 2012
@@ -17,7 +17,7 @@
 
 bin=`which $0`
 bin=`dirname ${bin}`
-bin=`cd "$bin"; pwd`
+bin=`cd "$bin" > /dev/null; pwd`
 
 DEFAULT_LIBEXEC_DIR="$bin"/../libexec
 HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
@@ -30,6 +30,7 @@ function print_usage(){
   echo "  namenode -format     format the DFS filesystem"
   echo "  secondarynamenode    run the DFS secondary namenode"
   echo "  namenode             run the DFS namenode"
+  echo "  journalnode          run the DFS journalnode"
   echo "  zkfc                 run the ZK Failover Controller daemon"
   echo "  datanode             run a DFS datanode"
   echo "  dfsadmin             run a DFS admin client"
@@ -90,6 +91,9 @@ elif [ "$COMMAND" = "datanode" ] ; then
   else
     HADOOP_OPTS="$HADOOP_OPTS -server $HADOOP_DATANODE_OPTS"
   fi
+elif [ "$COMMAND" = "journalnode" ] ; then
+  CLASS='org.apache.hadoop.hdfs.qjournal.server.JournalNode'
+  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_JOURNALNODE_OPTS"
 elif [ "$COMMAND" = "dfs" ] ; then
   CLASS=org.apache.hadoop.fs.FsShell
   HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-dfs.sh
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-dfs.sh?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-dfs.sh (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-dfs.sh Fri Oct 19 02:25:55 2012
@@ -61,4 +61,14 @@ if [ -n "$SECONDARY_NAMENODES" ]; then
       --script "$bin/hdfs" stop secondarynamenode
 fi
 
+#---------------------------------------------------------
+# ZK Failover controllers, if auto-HA is enabled
+AUTOHA_ENABLED=$($HADOOP_PREFIX/bin/hdfs getconf -confKey dfs.ha.automatic-failover.enabled)
+if [ "$(echo "$AUTOHA_ENABLED" | tr A-Z a-z)" = "true" ]; then
+  echo "Stopping ZK Failover Controllers on NN hosts [$NAMENODES]"
+  "$HADOOP_PREFIX/sbin/hadoop-daemons.sh" \
+    --config "$HADOOP_CONF_DIR" \
+    --hostnames "$NAMENODES" \
+    --script "$bin/hdfs" stop zkfc
+fi
 # eof

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/hadoop-metrics2.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/hadoop-metrics2.properties?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/hadoop-metrics2.properties (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/conf/hadoop-metrics2.properties Fri Oct 19 02:25:55 2012
@@ -19,7 +19,7 @@
 # See javadoc of package-info.java for org.apache.hadoop.metrics2 for details
 
 *.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink
-# default sampling period
+# default sampling period, in seconds
 *.period=10
 
 # The namenode-metrics.out will contain metrics from all context

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/faultinject_framework.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/faultinject_framework.xml?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/faultinject_framework.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/faultinject_framework.xml Fri Oct 19 02:25:55 2012
@@ -332,13 +332,12 @@ package org.apache.hadoop.fs;
 
 import org.junit.Test;
 import org.junit.Before;
-import junit.framework.TestCase;
 
-public class DemoFiTest extends TestCase {
+public class DemoFiTest {
   public static final String BLOCK_RECEIVER_FAULT="hdfs.datanode.BlockReceiver";
   @Override
   @Before
-  public void setUp(){
+  public void setUp() {
     //Setting up the test's environment as required
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/libhdfs.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/libhdfs.xml?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/libhdfs.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/libhdfs.xml Fri Oct 19 02:25:55 2012
@@ -1,110 +1,110 @@
-<?xml version="1.0"?>
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-
-<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN"
-          "http://forrest.apache.org/dtd/document-v20.dtd">
-
-<document>
-<header>
-<title>C API libhdfs</title>
-<meta name="http-equiv">Content-Type</meta>
-<meta name="content">text/html;</meta>
-<meta name="charset">utf-8</meta>
-</header>
-<body>
-<section>
-<title>Overview</title>
-
-<p>
-libhdfs is a JNI based C API for Hadoop's Distributed File System (HDFS).
-It provides C APIs to a subset of the HDFS APIs to manipulate HDFS files and
-the filesystem. libhdfs is part of the Hadoop distribution and comes 
-pre-compiled in ${HADOOP_PREFIX}/libhdfs/libhdfs.so .
-</p>
-
-</section>
-<section>
-<title>The APIs</title>
-
-<p>
-The libhdfs APIs are a subset of: <a href="api/org/apache/hadoop/fs/FileSystem.html" >hadoop fs APIs</a>.  
-</p>
-<p>
-The header file for libhdfs describes each API in detail and is available in ${HADOOP_PREFIX}/src/c++/libhdfs/hdfs.h
-</p>
-</section>
-<section>
-<title>A Sample Program</title>
-
-<source>
-#include "hdfs.h" 
-
-int main(int argc, char **argv) {
-
-    hdfsFS fs = hdfsConnect("default", 0);
-    const char* writePath = "/tmp/testfile.txt";
-    hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
-    if(!writeFile) {
-          fprintf(stderr, "Failed to open %s for writing!\n", writePath);
-          exit(-1);
-    }
-    char* buffer = "Hello, World!";
-    tSize num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer)+1);
-    if (hdfsFlush(fs, writeFile)) {
-           fprintf(stderr, "Failed to 'flush' %s\n", writePath); 
-          exit(-1);
-    }
-   hdfsCloseFile(fs, writeFile);
-}
-</source>
-</section>
-
-<section>
-<title>How To Link With The Library</title>
-<p>
-See the Makefile for hdfs_test.c in the libhdfs source directory (${HADOOP_PREFIX}/src/c++/libhdfs/Makefile) or something like:<br />
-gcc above_sample.c -I${HADOOP_PREFIX}/src/c++/libhdfs -L${HADOOP_PREFIX}/libhdfs -lhdfs -o above_sample
-</p>
-</section>
-<section>
-<title>Common Problems</title>
-<p>
-The most common problem is the CLASSPATH is not set properly when calling a program that uses libhdfs. 
-Make sure you set it to all the Hadoop jars needed to run Hadoop itself. Currently, there is no way to 
-programmatically generate the classpath, but a good bet is to include all the jar files in ${HADOOP_PREFIX} 
-and ${HADOOP_PREFIX}/lib as well as the right configuration directory containing hdfs-site.xml
-</p>
-</section>
-<section>
-<title>Thread Safe</title>
-<p>libdhfs is thread safe.</p>
-<ul>
-<li>Concurrency and Hadoop FS "handles" 
-<br />The Hadoop FS implementation includes a FS handle cache which caches based on the URI of the 
-namenode along with the user connecting. So, all calls to hdfsConnect will return the same handle but 
-calls to hdfsConnectAsUser with different users will return different handles.  But, since HDFS client 
-handles are completely thread safe, this has no bearing on concurrency. 
-</li>
-<li>Concurrency and libhdfs/JNI 
-<br />The libhdfs calls to JNI should always be creating thread local storage, so (in theory), libhdfs 
-should be as thread safe as the underlying calls to the Hadoop FS.
-</li>
-</ul>
-</section>
-</body>
-</document>
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN"
+          "http://forrest.apache.org/dtd/document-v20.dtd">
+
+<document>
+<header>
+<title>C API libhdfs</title>
+<meta name="http-equiv">Content-Type</meta>
+<meta name="content">text/html;</meta>
+<meta name="charset">utf-8</meta>
+</header>
+<body>
+<section>
+<title>Overview</title>
+
+<p>
+libhdfs is a JNI based C API for Hadoop's Distributed File System (HDFS).
+It provides C APIs to a subset of the HDFS APIs to manipulate HDFS files and
+the filesystem. libhdfs is part of the Hadoop distribution and comes 
+pre-compiled in ${HADOOP_PREFIX}/libhdfs/libhdfs.so .
+</p>
+
+</section>
+<section>
+<title>The APIs</title>
+
+<p>
+The libhdfs APIs are a subset of: <a href="api/org/apache/hadoop/fs/FileSystem.html" >hadoop fs APIs</a>.  
+</p>
+<p>
+The header file for libhdfs describes each API in detail and is available in ${HADOOP_PREFIX}/src/c++/libhdfs/hdfs.h
+</p>
+</section>
+<section>
+<title>A Sample Program</title>
+
+<source>
+#include "hdfs.h" 
+
+int main(int argc, char **argv) {
+
+    hdfsFS fs = hdfsConnect("default", 0);
+    const char* writePath = "/tmp/testfile.txt";
+    hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
+    if(!writeFile) {
+          fprintf(stderr, "Failed to open %s for writing!\n", writePath);
+          exit(-1);
+    }
+    char* buffer = "Hello, World!";
+    tSize num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer)+1);
+    if (hdfsFlush(fs, writeFile)) {
+           fprintf(stderr, "Failed to 'flush' %s\n", writePath); 
+          exit(-1);
+    }
+   hdfsCloseFile(fs, writeFile);
+}
+</source>
+</section>
+
+<section>
+<title>How To Link With The Library</title>
+<p>
+See the Makefile for hdfs_test.c in the libhdfs source directory (${HADOOP_PREFIX}/src/c++/libhdfs/Makefile) or something like:<br />
+gcc above_sample.c -I${HADOOP_PREFIX}/src/c++/libhdfs -L${HADOOP_PREFIX}/libhdfs -lhdfs -o above_sample
+</p>
+</section>
+<section>
+<title>Common Problems</title>
+<p>
+The most common problem is the CLASSPATH is not set properly when calling a program that uses libhdfs. 
+Make sure you set it to all the Hadoop jars needed to run Hadoop itself. Currently, there is no way to 
+programmatically generate the classpath, but a good bet is to include all the jar files in ${HADOOP_PREFIX} 
+and ${HADOOP_PREFIX}/lib as well as the right configuration directory containing hdfs-site.xml
+</p>
+</section>
+<section>
+<title>Thread Safe</title>
+<p>libdhfs is thread safe.</p>
+<ul>
+<li>Concurrency and Hadoop FS "handles" 
+<br />The Hadoop FS implementation includes a FS handle cache which caches based on the URI of the 
+namenode along with the user connecting. So, all calls to hdfsConnect will return the same handle but 
+calls to hdfsConnectAsUser with different users will return different handles.  But, since HDFS client 
+handles are completely thread safe, this has no bearing on concurrency. 
+</li>
+<li>Concurrency and libhdfs/JNI 
+<br />The libhdfs calls to JNI should always be creating thread local storage, so (in theory), libhdfs 
+should be as thread safe as the underlying calls to the Hadoop FS.
+</li>
+</ul>
+</section>
+</body>
+</document>

Propchange: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1363593-1396941
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1360400-1399945

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java Fri Oct 19 02:25:55 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.hdfs.CorruptFileBlockIterator;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -93,10 +94,10 @@ public class Hdfs extends AbstractFileSy
   public HdfsDataOutputStream createInternal(Path f,
       EnumSet<CreateFlag> createFlag, FsPermission absolutePermission,
       int bufferSize, short replication, long blockSize, Progressable progress,
-      int bytesPerChecksum, boolean createParent) throws IOException {
+      ChecksumOpt checksumOpt, boolean createParent) throws IOException {
     return new HdfsDataOutputStream(dfs.primitiveCreate(getUriPath(f),
         absolutePermission, createFlag, createParent, replication, blockSize,
-        progress, bufferSize, bytesPerChecksum), getStatistics());
+        progress, bufferSize, checksumOpt), getStatistics());
   }
 
   @Override
@@ -311,9 +312,6 @@ public class Hdfs extends AbstractFileSy
     return listing.toArray(new FileStatus[listing.size()]);
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public RemoteIterator<Path> listCorruptFileBlocks(Path path)
     throws IOException {
@@ -323,7 +321,7 @@ public class Hdfs extends AbstractFileSy
   @Override
   public void mkdir(Path dir, FsPermission permission, boolean createParent)
     throws IOException, UnresolvedLinkException {
-    dfs.mkdirs(getUriPath(dir), permission, createParent);
+    dfs.primitiveMkdir(getUriPath(dir), permission, createParent);
   }
 
   @SuppressWarnings("deprecation")

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Fri Oct 19 02:25:55 2012
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.Socket;
 
 import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 
 /**
  * A BlockReader is responsible for reading a single block
@@ -71,4 +72,8 @@ public interface BlockReader extends Byt
    */
   boolean hasSentStatusCode();
 
+  /**
+   * @return a reference to the streams this block reader is using.
+   */
+  IOStreamPair getStreams();
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Fri Oct 19 02:25:55 2012
@@ -25,7 +25,12 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSClient.Conf;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 
 
@@ -41,12 +46,13 @@ public class BlockReaderFactory {
       Configuration conf,
       Socket sock, String file,
       ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, 
-      long startOffset, long len) throws IOException {
+      long startOffset, long len, DataEncryptionKey encryptionKey)
+          throws IOException {
     int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
         DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
     return newBlockReader(new Conf(conf),
         sock, file, block, blockToken, startOffset,
-        len, bufferSize, true, "");
+        len, bufferSize, true, "", encryptionKey, null);
   }
 
   /**
@@ -73,14 +79,32 @@ public class BlockReaderFactory {
                                      Token<BlockTokenIdentifier> blockToken,
                                      long startOffset, long len,
                                      int bufferSize, boolean verifyChecksum,
-                                     String clientName)
+                                     String clientName,
+                                     DataEncryptionKey encryptionKey,
+                                     IOStreamPair ioStreams)
                                      throws IOException {
+    
     if (conf.useLegacyBlockReader) {
+      if (encryptionKey != null) {
+        throw new RuntimeException("Encryption is not supported with the legacy block reader.");
+      }
       return RemoteBlockReader.newBlockReader(
           sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
     } else {
+      if (ioStreams == null) {
+        ioStreams = new IOStreamPair(NetUtils.getInputStream(sock),
+            NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT));
+        if (encryptionKey != null) {
+          IOStreamPair encryptedStreams =
+              DataTransferEncryptor.getEncryptedStreams(
+                  ioStreams.out, ioStreams.in, encryptionKey);
+          ioStreams = encryptedStreams;
+        }
+      }
+      
       return RemoteBlockReader2.newBlockReader(
-          sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);      
+          sock, file, block, blockToken, startOffset, len, bufferSize,
+          verifyChecksum, clientName, encryptionKey, ioStreams);
     }
   }
   

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Fri Oct 19 02:25:55 2012
@@ -35,10 +35,12 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.util.DirectBufferPool;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
@@ -84,11 +86,11 @@ class BlockReaderLocal implements BlockR
     }
 
     private synchronized ClientDatanodeProtocol getDatanodeProxy(
-        DatanodeInfo node, Configuration conf, int socketTimeout)
-        throws IOException {
+        DatanodeInfo node, Configuration conf, int socketTimeout,
+        boolean connectToDnViaHostname) throws IOException {
       if (proxy == null) {
         proxy = DFSUtil.createClientDatanodeProtocolProxy(node, conf,
-            socketTimeout);
+            socketTimeout, connectToDnViaHostname);
       }
       return proxy;
     }
@@ -154,14 +156,16 @@ class BlockReaderLocal implements BlockR
    */
   static BlockReaderLocal newBlockReader(Configuration conf, String file,
       ExtendedBlock blk, Token<BlockTokenIdentifier> token, DatanodeInfo node,
-      int socketTimeout, long startOffset, long length) throws IOException {
+      int socketTimeout, long startOffset, long length,
+      boolean connectToDnViaHostname) throws IOException {
 
     LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
         .getIpcPort());
     // check the cache first
     BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
     if (pathinfo == null) {
-      pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token);
+      pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token,
+          connectToDnViaHostname);
     }
 
     // check to see if the file exists. It may so happen that the
@@ -239,11 +243,12 @@ class BlockReaderLocal implements BlockR
   
   private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk,
       DatanodeInfo node, Configuration conf, int timeout,
-      Token<BlockTokenIdentifier> token) throws IOException {
+      Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname)
+          throws IOException {
     LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
     BlockLocalPathInfo pathinfo = null;
     ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
-        conf, timeout);
+        conf, timeout, connectToDnViaHostname);
     try {
       // make RPC to local datanode to find local pathnames of blocks
       pathinfo = proxy.getBlockLocalPathInfo(blk, token);
@@ -285,7 +290,7 @@ class BlockReaderLocal implements BlockR
       long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
       throws IOException {
     this(conf, hdfsfile, block, token, startOffset, length, pathinfo,
-        DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL, 4), false,
+        DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false,
         dataIn, startOffset, null);
   }
 
@@ -315,23 +320,10 @@ class BlockReaderLocal implements BlockR
     boolean success = false;
     try {
       // Skip both input streams to beginning of the chunk containing startOffset
-      long toSkip = firstChunkOffset;
-      while (toSkip > 0) {
-        long skipped = dataIn.skip(toSkip);
-        if (skipped == 0) {
-          throw new IOException("Couldn't initialize input stream");
-        }
-        toSkip -= skipped;
-      }
+      IOUtils.skipFully(dataIn, firstChunkOffset);
       if (checksumIn != null) {
         long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
-        while (checkSumOffset > 0) {
-          long skipped = checksumIn.skip(checkSumOffset);
-          if (skipped == 0) {
-            throw new IOException("Couldn't initialize checksum input stream");
-          }
-          checkSumOffset -= skipped;
-        }
+        IOUtils.skipFully(checksumIn, checkSumOffset);
       }
       success = true;
     } finally {
@@ -636,17 +628,9 @@ class BlockReaderLocal implements BlockR
     slowReadBuff.position(slowReadBuff.limit());
     checksumBuff.position(checksumBuff.limit());
   
-    long dataSkipped = dataIn.skip(toskip);
-    if (dataSkipped != toskip) {
-      throw new IOException("skip error in data input stream");
-    }
-    long checkSumOffset = (dataSkipped / bytesPerChecksum) * checksumSize;
-    if (checkSumOffset > 0) {
-      long skipped = checksumIn.skip(checkSumOffset);
-      if (skipped != checkSumOffset) {
-        throw new IOException("skip error in checksum input stream");
-      }
-    }
+    IOUtils.skipFully(dataIn, toskip);
+    long checkSumOffset = (toskip / bytesPerChecksum) * checksumSize;
+    IOUtils.skipFully(checksumIn, checkSumOffset);
 
     // read into the middle of the chunk
     if (skipBuf == null) {
@@ -701,4 +685,9 @@ class BlockReaderLocal implements BlockR
   public boolean hasSentStatusCode() {
     return false;
   }
-}
\ No newline at end of file
+
+  @Override
+  public IOStreamPair getStreams() {
+    return null;
+  }
+}

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java Fri Oct 19 02:25:55 2012
@@ -22,12 +22,15 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
 
 import org.apache.commons.io.input.BoundedInputStream;
 import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.net.HttpHeaders;
 
 /**
  * To support HTTP byte streams, a new connection to an HTTP server needs to be
@@ -57,9 +60,9 @@ public abstract class ByteRangeInputStre
       return url;
     }
 
-    protected abstract HttpURLConnection openConnection() throws IOException;
-
-    protected abstract HttpURLConnection openConnection(final long offset) throws IOException;
+    /** Connect to server with a data offset. */
+    protected abstract HttpURLConnection connect(final long offset,
+        final boolean resolved) throws IOException;
   }
 
   enum StreamStatus {
@@ -70,7 +73,7 @@ public abstract class ByteRangeInputStre
   protected URLOpener resolvedURL;
   protected long startPos = 0;
   protected long currentPos = 0;
-  protected long filelength;
+  protected Long fileLength = null;
 
   StreamStatus status = StreamStatus.SEEK;
 
@@ -85,9 +88,6 @@ public abstract class ByteRangeInputStre
     this.resolvedURL = r;
   }
   
-  protected abstract void checkResponseCode(final HttpURLConnection connection
-      ) throws IOException;
-  
   protected abstract URL getResolvedUrl(final HttpURLConnection connection
       ) throws IOException;
 
@@ -113,35 +113,64 @@ public abstract class ByteRangeInputStre
   protected InputStream openInputStream() throws IOException {
     // Use the original url if no resolved url exists, eg. if
     // it's the first time a request is made.
-    final URLOpener opener =
-      (resolvedURL.getURL() == null) ? originalURL : resolvedURL;
-
-    final HttpURLConnection connection = opener.openConnection(startPos);
-    connection.connect();
-    checkResponseCode(connection);
-
-    final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
-    if (cl == null) {
-      throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing");
-    }
-    final long streamlength = Long.parseLong(cl);
-    filelength = startPos + streamlength;
-    // Java has a bug with >2GB request streams.  It won't bounds check
-    // the reads so the transfer blocks until the server times out
-    InputStream is =
-        new BoundedInputStream(connection.getInputStream(), streamlength);
+    final boolean resolved = resolvedURL.getURL() != null; 
+    final URLOpener opener = resolved? resolvedURL: originalURL;
 
+    final HttpURLConnection connection = opener.connect(startPos, resolved);
     resolvedURL.setURL(getResolvedUrl(connection));
-    
-    return is;
+
+    InputStream in = connection.getInputStream();
+    final Map<String, List<String>> headers = connection.getHeaderFields();
+    if (isChunkedTransferEncoding(headers)) {
+      // file length is not known
+      fileLength = null;
+    } else {
+      // for non-chunked transfer-encoding, get content-length
+      final String cl = connection.getHeaderField(HttpHeaders.CONTENT_LENGTH);
+      if (cl == null) {
+        throw new IOException(HttpHeaders.CONTENT_LENGTH + " is missing: "
+            + headers);
+      }
+      final long streamlength = Long.parseLong(cl);
+      fileLength = startPos + streamlength;
+
+      // Java has a bug with >2GB request streams.  It won't bounds check
+      // the reads so the transfer blocks until the server times out
+      in = new BoundedInputStream(in, streamlength);
+    }
+
+    return in;
   }
   
+  private static boolean isChunkedTransferEncoding(
+      final Map<String, List<String>> headers) {
+    return contains(headers, HttpHeaders.TRANSFER_ENCODING, "chunked")
+        || contains(headers, HttpHeaders.TE, "chunked");
+  }
+
+  /** Does the HTTP header map contain the given key, value pair? */
+  private static boolean contains(final Map<String, List<String>> headers,
+      final String key, final String value) {
+    final List<String> values = headers.get(key);
+    if (values != null) {
+      for(String v : values) {
+        for(final StringTokenizer t = new StringTokenizer(v, ",");
+            t.hasMoreTokens(); ) {
+          if (value.equalsIgnoreCase(t.nextToken())) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
   private int update(final int n) throws IOException {
     if (n != -1) {
       currentPos += n;
-    } else if (currentPos < filelength) {
+    } else if (fileLength != null && currentPos < fileLength) {
       throw new IOException("Got EOF but currentPos = " + currentPos
-          + " < filelength = " + filelength);
+          + " < filelength = " + fileLength);
     }
     return n;
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Oct 19 02:25:55 2012
@@ -39,6 +39,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT;
@@ -47,12 +49,15 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
 
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -64,6 +69,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -73,8 +79,10 @@ import javax.net.SocketFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.BlockStorageLocation;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
@@ -82,12 +90,17 @@ import org.apache.hadoop.fs.FileAlreadyE
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.HdfsBlockLocation;
 import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
 import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.VolumeId;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
@@ -97,16 +110,18 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
 import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -114,9 +129,10 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -136,6 +152,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
@@ -173,11 +190,12 @@ public class DFSClient implements java.i
   final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
   final FileSystem.Statistics stats;
   final int hdfsTimeout;    // timeout value for a DFS operation.
-  final LeaseRenewer leaserenewer;
+  private final String authority;
   final SocketCache socketCache;
   final Conf dfsClientConf;
   private Random r = new Random();
   private SocketAddress[] localInterfaceAddrs;
+  private DataEncryptionKey encryptionKey;
 
   /**
    * DFSClient configuration 
@@ -189,11 +207,11 @@ public class DFSClient implements java.i
     final int maxBlockAcquireFailures;
     final int confTime;
     final int ioBufferSize;
-    final int checksumType;
-    final int bytesPerChecksum;
+    final ChecksumOpt defaultChecksumOpt;
     final int writePacketSize;
     final int socketTimeout;
     final int socketCacheCapacity;
+    final long socketCacheExpiry;
     /** Wait time window (in msec) if BlockMissingException is caught */
     final int timeWindow;
     final int nCachedConnRetry;
@@ -205,6 +223,10 @@ public class DFSClient implements java.i
     final String taskId;
     final FsPermission uMask;
     final boolean useLegacyBlockReader;
+    final boolean connectToDnViaHostname;
+    final boolean getHdfsBlocksMetadataEnabled;
+    final int getFileBlockStorageLocationsNumThreads;
+    final int getFileBlockStorageLocationsTimeout;
 
     Conf(Configuration conf) {
       maxFailoverAttempts = conf.getInt(
@@ -225,9 +247,7 @@ public class DFSClient implements java.i
       ioBufferSize = conf.getInt(
           CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
           CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
-      checksumType = getChecksumType(conf);
-      bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
-          DFS_BYTES_PER_CHECKSUM_DEFAULT);
+      defaultChecksumOpt = getChecksumOptFromConf(conf);
       socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
           HdfsServerConstants.READ_TIMEOUT);
       /** dfs.write.packet.size is an internal config variable */
@@ -240,6 +260,8 @@ public class DFSClient implements java.i
       taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
       socketCacheCapacity = conf.getInt(DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
           DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
+      socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
+          DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
       prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
           10 * defaultBlockSize);
       timeWindow = conf
@@ -255,26 +277,59 @@ public class DFSClient implements java.i
       useLegacyBlockReader = conf.getBoolean(
           DFS_CLIENT_USE_LEGACY_BLOCKREADER,
           DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
-    }
-
-    private int getChecksumType(Configuration conf) {
-      String checksum = conf.get(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY,
+      connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
+          DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+      getHdfsBlocksMetadataEnabled = conf.getBoolean(
+          DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, 
+          DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+      getFileBlockStorageLocationsNumThreads = conf.getInt(
+          DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS,
+          DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT);
+      getFileBlockStorageLocationsTimeout = conf.getInt(
+          DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT,
+          DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT);
+    }
+
+    private DataChecksum.Type getChecksumType(Configuration conf) {
+      final String checksum = conf.get(
+          DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY,
           DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
-      if ("CRC32".equals(checksum)) {
-        return DataChecksum.CHECKSUM_CRC32;
-      } else if ("CRC32C".equals(checksum)) {
-        return DataChecksum.CHECKSUM_CRC32C;
-      } else if ("NULL".equals(checksum)) {
-        return DataChecksum.CHECKSUM_NULL;
-      } else {
-        LOG.warn("Bad checksum type: " + checksum + ". Using default.");
-        return DataChecksum.CHECKSUM_CRC32C;
+      try {
+        return DataChecksum.Type.valueOf(checksum);
+      } catch(IllegalArgumentException iae) {
+        LOG.warn("Bad checksum type: " + checksum + ". Using default "
+            + DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
+        return DataChecksum.Type.valueOf(
+            DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT); 
       }
     }
 
-    private DataChecksum createChecksum() {
-      return DataChecksum.newDataChecksum(
-          checksumType, bytesPerChecksum);
+    // Construct a checksum option from conf
+    private ChecksumOpt getChecksumOptFromConf(Configuration conf) {
+      DataChecksum.Type type = getChecksumType(conf);
+      int bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
+          DFS_BYTES_PER_CHECKSUM_DEFAULT);
+      return new ChecksumOpt(type, bytesPerChecksum);
+    }
+
+    // create a DataChecksum with the default option.
+    private DataChecksum createChecksum() throws IOException {
+      return createChecksum(null);
+    }
+
+    private DataChecksum createChecksum(ChecksumOpt userOpt) 
+        throws IOException {
+      // Fill in any missing field with the default.
+      ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt(
+          defaultChecksumOpt, userOpt);
+      DataChecksum dataChecksum = DataChecksum.newDataChecksum(
+          myOpt.getChecksumType(),
+          myOpt.getBytesPerChecksum());
+      if (dataChecksum == null) {
+        throw new IOException("Invalid checksum type specified: "
+            + myOpt.getChecksumType().name());
+      }
+      return dataChecksum;
     }
   }
  
@@ -343,12 +398,9 @@ public class DFSClient implements java.i
     this.hdfsTimeout = Client.getTimeout(conf);
     this.ugi = UserGroupInformation.getCurrentUser();
     
-    final String authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
-    this.leaserenewer = LeaseRenewer.getInstance(authority, ugi, this);
-    this.clientName = leaserenewer.getClientName(dfsClientConf.taskId);
-    
-    this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
-    
+    this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
+    this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" + 
+        DFSUtil.getRandom().nextInt()  + "_" + Thread.currentThread().getId();
     
     if (rpcNamenode != null) {
       // This case is used for testing.
@@ -379,6 +431,8 @@ public class DFSClient implements java.i
       Joiner.on(',').join(localInterfaces)+ "] with addresses [" +
       Joiner.on(',').join(localInterfaceAddrs) + "]");
     }
+    
+    this.socketCache = SocketCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
   }
 
   /**
@@ -466,6 +520,14 @@ public class DFSClient implements java.i
     return clientName;
   }
 
+  /**
+   * @return whether the client should use hostnames instead of IPs
+   *    when connecting to DataNodes
+   */
+  boolean connectToDnViaHostname() {
+    return dfsClientConf.connectToDnViaHostname;
+  }
+
   void checkOpen() throws IOException {
     if (!clientRunning) {
       IOException result = new IOException("Filesystem closed");
@@ -473,7 +535,30 @@ public class DFSClient implements java.i
     }
   }
 
-  /** Put a file. */
+  /** Return the lease renewer instance. The renewer thread won't start
+   *  until the first output stream is created. The same instance will
+   *  be returned until all output streams are closed.
+   */
+  public LeaseRenewer getLeaseRenewer() throws IOException {
+      return LeaseRenewer.getInstance(authority, ugi, this);
+  }
+
+  /** Get a lease and start automatic renewal */
+  private void beginFileLease(final String src, final DFSOutputStream out) 
+      throws IOException {
+    getLeaseRenewer().put(src, out, this);
+  }
+
+  /** Stop renewal of lease for the file. */
+  void endFileLease(final String src) throws IOException {
+    getLeaseRenewer().closeFile(src, this);
+  }
+    
+
+  /** Put a file. Only called from LeaseRenewer, where proper locking is
+   *  enforced to consistently update its local dfsclients array and 
+   *  client's filesBeingWritten map.
+   */
   void putFileBeingWritten(final String src, final DFSOutputStream out) {
     synchronized(filesBeingWritten) {
       filesBeingWritten.put(src, out);
@@ -486,7 +571,7 @@ public class DFSClient implements java.i
     }
   }
 
-  /** Remove a file. */
+  /** Remove a file. Only called from LeaseRenewer. */
   void removeFileBeingWritten(final String src) {
     synchronized(filesBeingWritten) {
       filesBeingWritten.remove(src);
@@ -517,7 +602,7 @@ public class DFSClient implements java.i
       if (filesBeingWritten.isEmpty()) {
         return;
       }
-      lastLeaseRenewal = System.currentTimeMillis();
+      lastLeaseRenewal = Time.now();
     }
   }
 
@@ -534,7 +619,7 @@ public class DFSClient implements java.i
         return true;
       } catch (IOException e) {
         // Abort if the lease has already expired. 
-        final long elapsed = System.currentTimeMillis() - getLastLeaseRenewal();
+        final long elapsed = Time.now() - getLastLeaseRenewal();
         if (elapsed > HdfsConstants.LEASE_SOFTLIMIT_PERIOD) {
           LOG.warn("Failed to renew lease for " + clientName + " for "
               + (elapsed/1000) + " seconds (>= soft-limit ="
@@ -561,7 +646,14 @@ public class DFSClient implements java.i
   void abort() {
     clientRunning = false;
     closeAllFilesBeingWritten(true);
-    socketCache.clear();
+
+    try {
+      // remove reference to this client and stop the renewer,
+      // if there is no more clients under the renewer.
+      getLeaseRenewer().closeClient(this);
+    } catch (IOException ioe) {
+       LOG.info("Exception occurred while aborting the client. " + ioe);
+    }
     closeConnectionToNamenode();
   }
 
@@ -596,18 +688,29 @@ public class DFSClient implements java.i
    * Close the file system, abandoning all of the leases and files being
    * created and close connections to the namenode.
    */
+  @Override
   public synchronized void close() throws IOException {
     if(clientRunning) {
       closeAllFilesBeingWritten(false);
-      socketCache.clear();
       clientRunning = false;
-      leaserenewer.closeClient(this);
+      getLeaseRenewer().closeClient(this);
       // close connections to the namenode
       closeConnectionToNamenode();
     }
   }
 
   /**
+   * Close all open streams, abandoning all of the leases and files being
+   * created.
+   * @param abort whether streams should be gracefully closed
+   */
+  public void closeOutputStreams(boolean abort) {
+    if (clientRunning) {
+      closeAllFilesBeingWritten(abort);
+    }
+  }
+
+  /**
    * Get the default block size for this cluster
    * @return the default block size in bytes
    */
@@ -632,7 +735,7 @@ public class DFSClient implements java.i
    * @see ClientProtocol#getServerDefaults()
    */
   public FsServerDefaults getServerDefaults() throws IOException {
-    long now = System.currentTimeMillis();
+    long now = Time.now();
     if (now - serverDefaultsLastUpdate > SERVER_DEFAULTS_VALIDITY_PERIOD) {
       serverDefaults = namenode.getServerDefaults();
       serverDefaultsLastUpdate = now;
@@ -690,12 +793,12 @@ public class DFSClient implements java.i
    */
   static BlockReader getLocalBlockReader(Configuration conf,
       String src, ExtendedBlock blk, Token<BlockTokenIdentifier> accessToken,
-      DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock)
-      throws InvalidToken, IOException {
+      DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock,
+      boolean connectToDnViaHostname) throws InvalidToken, IOException {
     try {
       return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
           chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
-              - offsetIntoBlock);
+              - offsetIntoBlock, connectToDnViaHostname);
     } catch (RemoteException re) {
       throw re.unwrapRemoteException(InvalidToken.class,
           AccessControlException.class);
@@ -891,7 +994,81 @@ public class DFSClient implements java.i
   public BlockLocation[] getBlockLocations(String src, long start, 
     long length) throws IOException, UnresolvedLinkException {
     LocatedBlocks blocks = getLocatedBlocks(src, start, length);
-    return DFSUtil.locatedBlocks2Locations(blocks);
+    BlockLocation[] locations =  DFSUtil.locatedBlocks2Locations(blocks);
+    HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length];
+    for (int i = 0; i < locations.length; i++) {
+      hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i));
+    }
+    return hdfsLocations;
+  }
+  
+  /**
+   * Get block location information about a list of {@link HdfsBlockLocation}.
+   * Used by {@link DistributedFileSystem#getFileBlockStorageLocations(List)} to
+   * get {@link BlockStorageLocation}s for blocks returned by
+   * {@link DistributedFileSystem#getFileBlockLocations(org.apache.hadoop.fs.FileStatus, long, long)}
+   * .
+   * 
+   * This is done by making a round of RPCs to the associated datanodes, asking
+   * the volume of each block replica. The returned array of
+   * {@link BlockStorageLocation} expose this information as a
+   * {@link VolumeId}.
+   * 
+   * @param blockLocations
+   *          target blocks on which to query volume location information
+   * @return volumeBlockLocations original block array augmented with additional
+   *         volume location information for each replica.
+   */
+  public BlockStorageLocation[] getBlockStorageLocations(
+      List<BlockLocation> blockLocations) throws IOException,
+      UnsupportedOperationException, InvalidBlockTokenException {
+    if (!getConf().getHdfsBlocksMetadataEnabled) {
+      throw new UnsupportedOperationException("Datanode-side support for " +
+          "getVolumeBlockLocations() must also be enabled in the client " +
+          "configuration.");
+    }
+    // Downcast blockLocations and fetch out required LocatedBlock(s)
+    List<LocatedBlock> blocks = new ArrayList<LocatedBlock>();
+    for (BlockLocation loc : blockLocations) {
+      if (!(loc instanceof HdfsBlockLocation)) {
+        throw new ClassCastException("DFSClient#getVolumeBlockLocations " +
+            "expected to be passed HdfsBlockLocations");
+      }
+      HdfsBlockLocation hdfsLoc = (HdfsBlockLocation) loc;
+      blocks.add(hdfsLoc.getLocatedBlock());
+    }
+    
+    // Re-group the LocatedBlocks to be grouped by datanodes, with the values
+    // a list of the LocatedBlocks on the datanode.
+    Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks = 
+        new LinkedHashMap<DatanodeInfo, List<LocatedBlock>>();
+    for (LocatedBlock b : blocks) {
+      for (DatanodeInfo info : b.getLocations()) {
+        if (!datanodeBlocks.containsKey(info)) {
+          datanodeBlocks.put(info, new ArrayList<LocatedBlock>());
+        }
+        List<LocatedBlock> l = datanodeBlocks.get(info);
+        l.add(b);
+      }
+    }
+        
+    // Make RPCs to the datanodes to get volume locations for its replicas
+    List<HdfsBlocksMetadata> metadatas = BlockStorageLocationUtil
+        .queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks,
+            getConf().getFileBlockStorageLocationsNumThreads,
+            getConf().getFileBlockStorageLocationsTimeout,
+            getConf().connectToDnViaHostname);
+    
+    // Regroup the returned VolumeId metadata to again be grouped by
+    // LocatedBlock rather than by datanode
+    Map<LocatedBlock, List<VolumeId>> blockVolumeIds = BlockStorageLocationUtil
+        .associateVolumeIdsWithBlocks(blocks, datanodeBlocks, metadatas);
+    
+    // Combine original BlockLocations with new VolumeId information
+    BlockStorageLocation[] volumeBlockLocations = BlockStorageLocationUtil
+        .convertToVolumeBlockLocations(blocks, blockVolumeIds);
+
+    return volumeBlockLocations;
   }
   
   public DFSInputStream open(String src) 
@@ -1002,12 +1179,13 @@ public class DFSClient implements java.i
     return create(src, FsPermission.getDefault(),
         overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
             : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
-        buffersize);
+        buffersize, null);
   }
 
   /**
    * Call {@link #create(String, FsPermission, EnumSet, boolean, short, 
-   * long, Progressable, int)} with <code>createParent</code> set to true.
+   * long, Progressable, int, ChecksumOpt)} with <code>createParent</code>
+   *  set to true.
    */
   public DFSOutputStream create(String src, 
                              FsPermission permission,
@@ -1015,10 +1193,11 @@ public class DFSClient implements java.i
                              short replication,
                              long blockSize,
                              Progressable progress,
-                             int buffersize)
+                             int buffersize,
+                             ChecksumOpt checksumOpt)
       throws IOException {
     return create(src, permission, flag, true,
-        replication, blockSize, progress, buffersize);
+        replication, blockSize, progress, buffersize, checksumOpt);
   }
 
   /**
@@ -1036,6 +1215,7 @@ public class DFSClient implements java.i
    * @param blockSize maximum block size
    * @param progress interface for reporting client progress
    * @param buffersize underlying buffer size 
+   * @param checksumOpt checksum options
    * 
    * @return output stream
    * 
@@ -1049,8 +1229,8 @@ public class DFSClient implements java.i
                              short replication,
                              long blockSize,
                              Progressable progress,
-                             int buffersize)
-    throws IOException {
+                             int buffersize,
+                             ChecksumOpt checksumOpt) throws IOException {
     checkOpen();
     if (permission == null) {
       permission = FsPermission.getDefault();
@@ -1061,8 +1241,8 @@ public class DFSClient implements java.i
     }
     final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
         src, masked, flag, createParent, replication, blockSize, progress,
-        buffersize, dfsClientConf.createChecksum());
-    leaserenewer.put(src, result, this);
+        buffersize, dfsClientConf.createChecksum(checksumOpt));
+    beginFileLease(src, result);
     return result;
   }
   
@@ -1099,20 +1279,18 @@ public class DFSClient implements java.i
                              long blockSize,
                              Progressable progress,
                              int buffersize,
-                             int bytesPerChecksum)
+                             ChecksumOpt checksumOpt)
       throws IOException, UnresolvedLinkException {
     checkOpen();
     CreateFlag.validate(flag);
     DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress);
     if (result == null) {
-      DataChecksum checksum = DataChecksum.newDataChecksum(
-          dfsClientConf.checksumType,
-          bytesPerChecksum);
+      DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt);
       result = DFSOutputStream.newStreamForCreate(this, src, absPermission,
           flag, createParent, replication, blockSize, progress, buffersize,
           checksum);
     }
-    leaserenewer.put(src, result, this);
+    beginFileLease(src, result);
     return result;
   }
   
@@ -1198,7 +1376,7 @@ public class DFSClient implements java.i
           + src + " on client " + clientName);
     }
     final DFSOutputStream result = callAppend(stat, src, buffersize, progress);
-    leaserenewer.put(src, result, this);
+    beginFileLease(src, result);
     return result;
   }
 
@@ -1385,7 +1563,45 @@ public class DFSClient implements java.i
    */
   public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
     checkOpen();
-    return getFileChecksum(src, namenode, socketFactory, dfsClientConf.socketTimeout);    
+    return getFileChecksum(src, namenode, socketFactory,
+        dfsClientConf.socketTimeout, getDataEncryptionKey(),
+        dfsClientConf.connectToDnViaHostname);
+  }
+  
+  @InterfaceAudience.Private
+  public void clearDataEncryptionKey() {
+    LOG.debug("Clearing encryption key");
+    synchronized (this) {
+      encryptionKey = null;
+    }
+  }
+  
+  /**
+   * @return true if data sent between this client and DNs should be encrypted,
+   *         false otherwise.
+   * @throws IOException in the event of error communicating with the NN
+   */
+  boolean shouldEncryptData() throws IOException {
+    FsServerDefaults d = getServerDefaults();
+    return d == null ? false : d.getEncryptDataTransfer();
+  }
+  
+  @InterfaceAudience.Private
+  public DataEncryptionKey getDataEncryptionKey()
+      throws IOException {
+    if (shouldEncryptData()) {
+      synchronized (this) {
+        if (encryptionKey == null ||
+            (encryptionKey != null &&
+             encryptionKey.expiryDate < Time.now())) {
+          LOG.debug("Getting new encryption token from NN");
+          encryptionKey = namenode.getDataEncryptionKey();
+        }
+        return encryptionKey;
+      }
+    } else {
+      return null;
+    }
   }
 
   /**
@@ -1394,8 +1610,9 @@ public class DFSClient implements java.i
    * @return The checksum 
    */
   public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
-      ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout
-      ) throws IOException {
+      ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
+      DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
+      throws IOException {
     //get all block locations
     LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
     if (null == blockLocations) {
@@ -1403,7 +1620,8 @@ public class DFSClient implements java.i
     }
     List<LocatedBlock> locatedblocks = blockLocations.getLocatedBlocks();
     final DataOutputBuffer md5out = new DataOutputBuffer();
-    int bytesPerCRC = 0;
+    int bytesPerCRC = -1;
+    DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;
     long crcPerBlock = 0;
     boolean refetchBlocks = false;
     int lastRetriedIndex = -1;
@@ -1433,15 +1651,25 @@ public class DFSClient implements java.i
         try {
           //connect to a datanode
           sock = socketFactory.createSocket();
-          NetUtils.connect(sock,
-              NetUtils.createSocketAddr(datanodes[j].getXferAddr()),
-              timeout);
+          String dnAddr = datanodes[j].getXferAddr(connectToDnViaHostname);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Connecting to datanode " + dnAddr);
+          }
+          NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
           sock.setSoTimeout(timeout);
 
-          out = new DataOutputStream(
-              new BufferedOutputStream(NetUtils.getOutputStream(sock), 
-                                       HdfsConstants.SMALL_BUFFER_SIZE));
-          in = new DataInputStream(NetUtils.getInputStream(sock));
+          OutputStream unbufOut = NetUtils.getOutputStream(sock);
+          InputStream unbufIn = NetUtils.getInputStream(sock);
+          if (encryptionKey != null) {
+            IOStreamPair encryptedStreams =
+                DataTransferEncryptor.getEncryptedStreams(
+                    unbufOut, unbufIn, encryptionKey);
+            unbufOut = encryptedStreams.out;
+            unbufIn = encryptedStreams.in;
+          }
+          out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+              HdfsConstants.SMALL_BUFFER_SIZE));
+          in = new DataInputStream(unbufIn);
 
           if (LOG.isDebugEnabled()) {
             LOG.debug("write to " + datanodes[j] + ": "
@@ -1497,6 +1725,17 @@ public class DFSClient implements java.i
               checksumData.getMd5().toByteArray());
           md5.write(md5out);
           
+          // read crc-type
+          final DataChecksum.Type ct = HdfsProtoUtil.
+              fromProto(checksumData.getCrcType());
+          if (i == 0) { // first block
+            crcType = ct;
+          } else if (crcType != DataChecksum.Type.MIXED
+              && crcType != ct) {
+            // if crc types are mixed in a file
+            crcType = DataChecksum.Type.MIXED;
+          }
+
           done = true;
 
           if (LOG.isDebugEnabled()) {
@@ -1522,7 +1761,18 @@ public class DFSClient implements java.i
 
     //compute file MD5
     final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData()); 
-    return new MD5MD5CRC32FileChecksum(bytesPerCRC, crcPerBlock, fileMD5);
+    switch (crcType) {
+      case CRC32:
+        return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC,
+            crcPerBlock, fileMD5);
+      case CRC32C:
+        return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC,
+            crcPerBlock, fileMD5);
+      default:
+        // we should never get here since the validity was checked
+        // when getCrcType() was called above.
+        return null;
+    }
   }
 
   /**
@@ -1635,6 +1885,20 @@ public class DFSClient implements java.i
       throw re.unwrapRemoteException(AccessControlException.class);
     }
   }
+
+  /**
+   * Rolls the edit log on the active NameNode.
+   * @return the txid of the new log segment 
+   *
+   * @see ClientProtocol#rollEdits()
+   */
+  long rollEdits() throws AccessControlException, IOException {
+    try {
+      return namenode.rollEdits();
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class);
+    }
+  }
   
   /**
    * enable/disable restore failed storage.
@@ -1686,14 +1950,6 @@ public class DFSClient implements java.i
   }
 
   /**
-   * @see ClientProtocol#distributedUpgradeProgress(HdfsConstants.UpgradeAction)
-   */
-  public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action)
-      throws IOException {
-    return namenode.distributedUpgradeProgress(action);
-  }
-
-  /**
    */
   @Deprecated
   public boolean mkdirs(String src) throws IOException {
@@ -1715,34 +1971,29 @@ public class DFSClient implements java.i
    */
   public boolean mkdirs(String src, FsPermission permission,
       boolean createParent) throws IOException {
-    checkOpen();
     if (permission == null) {
       permission = FsPermission.getDefault();
     }
     FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
-    if(LOG.isDebugEnabled()) {
-      LOG.debug(src + ": masked=" + masked);
-    }
-    try {
-      return namenode.mkdirs(src, masked, createParent);
-    } catch(RemoteException re) {
-      throw re.unwrapRemoteException(AccessControlException.class,
-                                     InvalidPathException.class,
-                                     FileAlreadyExistsException.class,
-                                     FileNotFoundException.class,
-                                     ParentNotDirectoryException.class,
-                                     SafeModeException.class,
-                                     NSQuotaExceededException.class,
-                                     UnresolvedPathException.class);
-    }
+    return primitiveMkdir(src, masked, createParent);
   }
-  
+
   /**
    * Same {{@link #mkdirs(String, FsPermission, boolean)} except
    * that the permissions has already been masked against umask.
    */
   public boolean primitiveMkdir(String src, FsPermission absPermission)
     throws IOException {
+    return primitiveMkdir(src, absPermission, true);
+  }
+
+  /**
+   * Same {{@link #mkdirs(String, FsPermission, boolean)} except
+   * that the permissions has already been masked against umask.
+   */
+  public boolean primitiveMkdir(String src, FsPermission absPermission, 
+    boolean createParent)
+    throws IOException {
     checkOpen();
     if (absPermission == null) {
       absPermission = 
@@ -1753,15 +2004,20 @@ public class DFSClient implements java.i
       LOG.debug(src + ": masked=" + absPermission);
     }
     try {
-      return namenode.mkdirs(src, absPermission, true);
+      return namenode.mkdirs(src, absPermission, createParent);
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
+                                     InvalidPathException.class,
+                                     FileAlreadyExistsException.class,
+                                     FileNotFoundException.class,
+                                     ParentNotDirectoryException.class,
+                                     SafeModeException.class,
                                      NSQuotaExceededException.class,
                                      DSQuotaExceededException.class,
                                      UnresolvedPathException.class);
     }
   }
-
+  
   /**
    * Get {@link ContentSummary} rooted at the specified directory.
    * @param path The string representation of the path
@@ -1833,10 +2089,7 @@ public class DFSClient implements java.i
   }
   
   boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr) {
-    if (shortCircuitLocalReads && isLocalAddress(targetAddr)) {
-      return true;
-    }
-    return false;
+    return shortCircuitLocalReads && isLocalAddress(targetAddr);
   }
 
   void reportChecksumFailure(String file, ExtendedBlock blk, DatanodeInfo dn) {



Mime
View raw message