hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1367365 [2/5] - in /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project: hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/dev-support/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/ha...
Date Mon, 30 Jul 2012 23:31:51 GMT
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java Mon Jul 30 23:31:42 2012
@@ -342,19 +342,28 @@ public class HftpFileSystem extends File
       super(url);
     }
 
-    @Override
     protected HttpURLConnection openConnection() throws IOException {
       return (HttpURLConnection)URLUtils.openConnection(url);
     }
 
     /** Use HTTP Range header for specifying offset. */
     @Override
-    protected HttpURLConnection openConnection(final long offset) throws IOException {
+    protected HttpURLConnection connect(final long offset,
+        final boolean resolved) throws IOException {
       final HttpURLConnection conn = openConnection();
       conn.setRequestMethod("GET");
       if (offset != 0L) {
         conn.setRequestProperty("Range", "bytes=" + offset + "-");
       }
+      conn.connect();
+
+      //Expects HTTP_OK or HTTP_PARTIAL response codes. 
+      final int code = conn.getResponseCode();
+      if (offset != 0L && code != HttpURLConnection.HTTP_PARTIAL) {
+        throw new IOException("HTTP_PARTIAL expected, received " + code);
+      } else if (offset == 0L && code != HttpURLConnection.HTTP_OK) {
+        throw new IOException("HTTP_OK expected, received " + code);
+      }
       return conn;
     }  
   }
@@ -368,22 +377,6 @@ public class HftpFileSystem extends File
       this(new RangeHeaderUrlOpener(url), new RangeHeaderUrlOpener(null));
     }
 
-    /** Expects HTTP_OK and HTTP_PARTIAL response codes. */
-    @Override
-    protected void checkResponseCode(final HttpURLConnection connection
-        ) throws IOException {
-      final int code = connection.getResponseCode();
-      if (startPos != 0 && code != HttpURLConnection.HTTP_PARTIAL) {
-        // We asked for a byte range but did not receive a partial content
-        // response...
-        throw new IOException("HTTP_PARTIAL expected, received " + code);
-      } else if (startPos == 0 && code != HttpURLConnection.HTTP_OK) {
-        // We asked for all bytes from the beginning but didn't receive a 200
-        // response (none of the other 2xx codes are valid here)
-        throw new IOException("HTTP_OK expected, received " + code);
-      }
-    }
-
     @Override
     protected URL getResolvedUrl(final HttpURLConnection connection) {
       return connection.getURL();

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Mon Jul 30 23:31:42 2012
@@ -259,7 +259,7 @@ public class NameNodeProxies {
    *     
    * Note that dfs.client.retry.max < 0 is not allowed.
    */
-  private static RetryPolicy getDefaultRpcRetryPolicy(Configuration conf) {
+  public static RetryPolicy getDefaultRetryPolicy(Configuration conf) {
     final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf);
     if (LOG.isDebugEnabled()) {
       LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry);
@@ -300,6 +300,13 @@ public class NameNodeProxies {
               + p.getClass().getSimpleName() + ", exception=" + e);
           return p.shouldRetry(e, retries, failovers, isMethodIdempotent);
         }
+
+        @Override
+        public String toString() {
+          return "RetryPolicy[" + multipleLinearRandomRetry + ", "
+              + RetryPolicies.TRY_ONCE_THEN_FAIL.getClass().getSimpleName()
+              + "]";
+        }
       };
     }
   }
@@ -335,7 +342,7 @@ public class NameNodeProxies {
       boolean withRetries) throws IOException {
     RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
 
-    final RetryPolicy defaultPolicy = getDefaultRpcRetryPolicy(conf);
+    final RetryPolicy defaultPolicy = getDefaultRetryPolicy(conf);
     final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
     ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
         ClientNamenodeProtocolPB.class, version, address, ugi, conf,

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Mon Jul 30 23:31:42 2012
@@ -487,12 +487,17 @@ public class JspHelper {
    */
   public static UserGroupInformation getDefaultWebUser(Configuration conf
                                                        ) throws IOException {
+    return UserGroupInformation.createRemoteUser(getDefaultWebUserName(conf));
+  }
+
+  private static String getDefaultWebUserName(Configuration conf
+      ) throws IOException {
     String user = conf.get(
         HADOOP_HTTP_STATIC_USER, DEFAULT_HADOOP_HTTP_STATIC_USER);
     if (user == null || user.length() == 0) {
       throw new IOException("Cannot determine UGI from request or conf");
     }
-    return UserGroupInformation.createRemoteUser(user);
+    return user;
   }
 
   private static InetSocketAddress getNNServiceAddress(ServletContext context,
@@ -538,65 +543,45 @@ public class JspHelper {
       HttpServletRequest request, Configuration conf,
       final AuthenticationMethod secureAuthMethod,
       final boolean tryUgiParameter) throws IOException {
-    final UserGroupInformation ugi;
+    UserGroupInformation ugi = null;
     final String usernameFromQuery = getUsernameFromQuery(request, tryUgiParameter);
     final String doAsUserFromQuery = request.getParameter(DoAsParam.NAME);
-
-    if(UserGroupInformation.isSecurityEnabled()) {
-      final String remoteUser = request.getRemoteUser();
-      String tokenString = request.getParameter(DELEGATION_PARAMETER_NAME);
+    final String remoteUser;
+   
+    if (UserGroupInformation.isSecurityEnabled()) {
+      remoteUser = request.getRemoteUser();
+      final String tokenString = request.getParameter(DELEGATION_PARAMETER_NAME);
       if (tokenString != null) {
-        Token<DelegationTokenIdentifier> token = 
-          new Token<DelegationTokenIdentifier>();
-        token.decodeFromUrlString(tokenString);
-        InetSocketAddress serviceAddress = getNNServiceAddress(context, request);
-        if (serviceAddress != null) {
-          SecurityUtil.setTokenService(token, serviceAddress);
-          token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
-        }
-        ByteArrayInputStream buf = new ByteArrayInputStream(token
-            .getIdentifier());
-        DataInputStream in = new DataInputStream(buf);
-        DelegationTokenIdentifier id = new DelegationTokenIdentifier();
-        id.readFields(in);
-        if (context != null) {
-          final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
-          if (nn != null) {
-            // Verify the token.
-            nn.getNamesystem().verifyToken(id, token.getPassword());
-          }
-        }
-        ugi = id.getUser();
-        if (ugi.getRealUser() == null) {
-          //non-proxy case
-          checkUsername(ugi.getShortUserName(), usernameFromQuery);
-          checkUsername(null, doAsUserFromQuery);
-        } else {
-          //proxy case
-          checkUsername(ugi.getRealUser().getShortUserName(), usernameFromQuery);
-          checkUsername(ugi.getShortUserName(), doAsUserFromQuery);
-          ProxyUsers.authorize(ugi, request.getRemoteAddr(), conf);
-        }
-        ugi.addToken(token);
-        ugi.setAuthenticationMethod(AuthenticationMethod.TOKEN);
-      } else {
-        if(remoteUser == null) {
-          throw new IOException("Security enabled but user not " +
-                                "authenticated by filter");
-        }
-        final UserGroupInformation realUgi = UserGroupInformation.createRemoteUser(remoteUser);
-        checkUsername(realUgi.getShortUserName(), usernameFromQuery);
+        // Token-based connections need only verify the effective user, and
+        // disallow proxying to different user.  Proxy authorization checks
+        // are not required since the checks apply to issuing a token.
+        ugi = getTokenUGI(context, request, tokenString, conf);
+        checkUsername(ugi.getShortUserName(), usernameFromQuery);
+        checkUsername(ugi.getShortUserName(), doAsUserFromQuery);
+      } else if (remoteUser == null) {
+        throw new IOException(
+            "Security enabled but user not authenticated by filter");
+      }
+    } else {
+      // Security's not on, pull from url or use default web user
+      remoteUser = (usernameFromQuery == null)
+          ? getDefaultWebUserName(conf) // not specified in request
+          : usernameFromQuery;
+    }
+
+    if (ugi == null) { // security is off, or there's no token
+      ugi = UserGroupInformation.createRemoteUser(remoteUser);
+      checkUsername(ugi.getShortUserName(), usernameFromQuery);
+      if (UserGroupInformation.isSecurityEnabled()) {
         // This is not necessarily true, could have been auth'ed by user-facing
         // filter
-        realUgi.setAuthenticationMethod(secureAuthMethod);
-        ugi = initUGI(realUgi, doAsUserFromQuery, request, true, conf);
+        ugi.setAuthenticationMethod(secureAuthMethod);
+      }
+      if (doAsUserFromQuery != null) {
+        // create and attempt to authorize a proxy user
+        ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, ugi);
+        ProxyUsers.authorize(ugi, request.getRemoteAddr(), conf);
       }
-    } else { // Security's not on, pull from url
-      final UserGroupInformation realUgi = usernameFromQuery == null?
-          getDefaultWebUser(conf) // not specified in request
-          : UserGroupInformation.createRemoteUser(usernameFromQuery);
-      realUgi.setAuthenticationMethod(AuthenticationMethod.SIMPLE);
-      ugi = initUGI(realUgi, doAsUserFromQuery, request, false, conf);
     }
     
     if(LOG.isDebugEnabled())
@@ -604,21 +589,34 @@ public class JspHelper {
     return ugi;
   }
 
-  private static UserGroupInformation initUGI(final UserGroupInformation realUgi,
-      final String doAsUserFromQuery, final HttpServletRequest request,
-      final boolean isSecurityEnabled, final Configuration conf
-      ) throws AuthorizationException {
-    final UserGroupInformation ugi;
-    if (doAsUserFromQuery == null) {
-      //non-proxy case
-      ugi = realUgi;
-    } else {
-      //proxy case
-      ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, realUgi);
-      ugi.setAuthenticationMethod(
-          isSecurityEnabled? AuthenticationMethod.PROXY: AuthenticationMethod.SIMPLE);
-      ProxyUsers.authorize(ugi, request.getRemoteAddr(), conf);
+  private static UserGroupInformation getTokenUGI(ServletContext context,
+                                                  HttpServletRequest request,
+                                                  String tokenString,
+                                                  Configuration conf)
+                                                      throws IOException {
+    final Token<DelegationTokenIdentifier> token =
+        new Token<DelegationTokenIdentifier>();
+    token.decodeFromUrlString(tokenString);
+    InetSocketAddress serviceAddress = getNNServiceAddress(context, request);
+    if (serviceAddress != null) {
+      SecurityUtil.setTokenService(token, serviceAddress);
+      token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
+    }
+
+    ByteArrayInputStream buf =
+        new ByteArrayInputStream(token.getIdentifier());
+    DataInputStream in = new DataInputStream(buf);
+    DelegationTokenIdentifier id = new DelegationTokenIdentifier();
+    id.readFields(in);
+    if (context != null) {
+      final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
+      if (nn != null) {
+        // Verify the token.
+        nn.getNamesystem().verifyToken(id, token.getPassword());
+      }
     }
+    UserGroupInformation ugi = id.getUser();
+    ugi.addToken(token);
     return ugi;
   }
 

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Mon Jul 30 23:31:42 2012
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.d
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
 import org.apache.hadoop.io.nativeio.NativeIO;
@@ -486,11 +487,14 @@ class BlockSender implements java.io.Clo
         
         // no need to flush since we know out is not a buffered stream
         FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
+        LongWritable waitTime = new LongWritable();
+        LongWritable transferTime = new LongWritable();
         sockOut.transferToFully(fileCh, blockInPosition, dataLen, 
-            datanode.metrics.getSendDataPacketBlockedOnNetworkNanos(),
-            datanode.metrics.getSendDataPacketTransferNanos());
+            waitTime, transferTime);
+        datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
+        datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
         blockInPosition += dataLen;
-      } else { 
+      } else {
         // normal transfer
         out.write(buf, 0, dataOff + dataLen);
       }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java Mon Jul 30 23:31:42 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.metrics2.annota
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
 import org.apache.hadoop.metrics2.lib.MutableRate;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 
@@ -74,19 +75,54 @@ public class DataNodeMetrics {
   @Metric MutableRate heartbeats;
   @Metric MutableRate blockReports;
   @Metric MutableRate packetAckRoundTripTimeNanos;
-
+  MutableQuantiles[] packetAckRoundTripTimeNanosQuantiles;
+  
   @Metric MutableRate flushNanos;
+  MutableQuantiles[] flushNanosQuantiles;
+  
   @Metric MutableRate fsyncNanos;
+  MutableQuantiles[] fsyncNanosQuantiles;
   
   @Metric MutableRate sendDataPacketBlockedOnNetworkNanos;
+  MutableQuantiles[] sendDataPacketBlockedOnNetworkNanosQuantiles;
   @Metric MutableRate sendDataPacketTransferNanos;
+  MutableQuantiles[] sendDataPacketTransferNanosQuantiles;
+  
 
   final MetricsRegistry registry = new MetricsRegistry("datanode");
   final String name;
 
-  public DataNodeMetrics(String name, String sessionId) {
+  public DataNodeMetrics(String name, String sessionId, int[] intervals) {
     this.name = name;
     registry.tag(SessionId, sessionId);
+    
+    final int len = intervals.length;
+    packetAckRoundTripTimeNanosQuantiles = new MutableQuantiles[len];
+    flushNanosQuantiles = new MutableQuantiles[len];
+    fsyncNanosQuantiles = new MutableQuantiles[len];
+    sendDataPacketBlockedOnNetworkNanosQuantiles = new MutableQuantiles[len];
+    sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len];
+    
+    for (int i = 0; i < len; i++) {
+      int interval = intervals[i];
+      packetAckRoundTripTimeNanosQuantiles[i] = registry.newQuantiles(
+          "packetAckRoundTripTimeNanos" + interval + "s",
+          "Packet Ack RTT in ns", "ops", "latency", interval);
+      flushNanosQuantiles[i] = registry.newQuantiles(
+          "flushNanos" + interval + "s", 
+          "Disk flush latency in ns", "ops", "latency", interval);
+      fsyncNanosQuantiles[i] = registry.newQuantiles(
+          "fsyncNanos" + interval + "s", "Disk fsync latency in ns", 
+          "ops", "latency", interval);
+      sendDataPacketBlockedOnNetworkNanosQuantiles[i] = registry.newQuantiles(
+          "sendDataPacketBlockedOnNetworkNanos" + interval + "s", 
+          "Time blocked on network while sending a packet in ns",
+          "ops", "latency", interval);
+      sendDataPacketTransferNanosQuantiles[i] = registry.newQuantiles(
+          "sendDataPacketTransferNanos" + interval + "s", 
+          "Time reading from disk and writing to network while sending " +
+          "a packet in ns", "ops", "latency", interval);
+    }
   }
 
   public static DataNodeMetrics create(Configuration conf, String dnName) {
@@ -94,8 +130,15 @@ public class DataNodeMetrics {
     MetricsSystem ms = DefaultMetricsSystem.instance();
     JvmMetrics.create("DataNode", sessionId, ms);
     String name = "DataNodeActivity-"+ (dnName.isEmpty()
-        ? "UndefinedDataNodeName"+ DFSUtil.getRandom().nextInt() : dnName.replace(':', '-'));
-    return ms.register(name, null, new DataNodeMetrics(name, sessionId));
+        ? "UndefinedDataNodeName"+ DFSUtil.getRandom().nextInt() 
+            : dnName.replace(':', '-'));
+
+    // Percentile measurement is off by default, by watching no intervals
+    int[] intervals = 
+        conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY);
+    
+    return ms.register(name, null, new DataNodeMetrics(name, sessionId,
+        intervals));
   }
 
   public String name() { return name; }
@@ -166,14 +209,23 @@ public class DataNodeMetrics {
 
   public void addPacketAckRoundTripTimeNanos(long latencyNanos) {
     packetAckRoundTripTimeNanos.add(latencyNanos);
+    for (MutableQuantiles q : packetAckRoundTripTimeNanosQuantiles) {
+      q.add(latencyNanos);
+    }
   }
 
   public void addFlushNanos(long latencyNanos) {
     flushNanos.add(latencyNanos);
+    for (MutableQuantiles q : flushNanosQuantiles) {
+      q.add(latencyNanos);
+    }
   }
 
   public void addFsyncNanos(long latencyNanos) {
     fsyncNanos.add(latencyNanos);
+    for (MutableQuantiles q : fsyncNanosQuantiles) {
+      q.add(latencyNanos);
+    }
   }
 
   public void shutdown() {
@@ -196,12 +248,18 @@ public class DataNodeMetrics {
   public void incrBlocksGetLocalPathInfo() {
     blocksGetLocalPathInfo.incr();
   }
-  
-  public MutableRate getSendDataPacketBlockedOnNetworkNanos() {
-    return sendDataPacketBlockedOnNetworkNanos;
+
+  public void addSendDataPacketBlockedOnNetworkNanos(long latencyNanos) {
+    sendDataPacketBlockedOnNetworkNanos.add(latencyNanos);
+    for (MutableQuantiles q : sendDataPacketBlockedOnNetworkNanosQuantiles) {
+      q.add(latencyNanos);
+    }
   }
-  
-  public MutableRate getSendDataPacketTransferNanos() {
-    return sendDataPacketTransferNanos;
+
+  public void addSendDataPacketTransferNanos(long latencyNanos) {
+    sendDataPacketTransferNanos.add(latencyNanos);
+    for (MutableQuantiles q : sendDataPacketTransferNanosQuantiles) {
+      q.add(latencyNanos);
+    }
   }
 }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointSignature.java Mon Jul 30 23:31:42 2012
@@ -113,12 +113,19 @@ public class CheckpointSignature extends
          + blockpoolID ;
   }
 
+  boolean storageVersionMatches(StorageInfo si) throws IOException {
+    return (layoutVersion == si.layoutVersion) && (cTime == si.cTime);
+  }
+
+  boolean isSameCluster(FSImage si) {
+    return namespaceID == si.getStorage().namespaceID &&
+      clusterID.equals(si.getClusterID()) &&
+      blockpoolID.equals(si.getBlockPoolID());
+  }
+
   void validateStorageInfo(FSImage si) throws IOException {
-    if(layoutVersion != si.getStorage().layoutVersion
-       || namespaceID != si.getStorage().namespaceID 
-       || cTime != si.getStorage().cTime
-       || !clusterID.equals(si.getClusterID())
-       || !blockpoolID.equals(si.getBlockPoolID())) {
+    if (!isSameCluster(si)
+        || !storageVersionMatches(si.getStorage())) {
       throw new IOException("Inconsistent checkpoint fields.\n"
           + "LV = " + layoutVersion + " namespaceID = " + namespaceID
           + " cTime = " + cTime

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Mon Jul 30 23:31:42 2012
@@ -230,8 +230,15 @@ public class FSDirectory implements Clos
 
     // Always do an implicit mkdirs for parent directory tree.
     long modTime = now();
-    if (!mkdirs(new Path(path).getParent().toString(), permissions, true,
-        modTime)) {
+    
+    Path parent = new Path(path).getParent();
+    if (parent == null) {
+      // Trying to add "/" as a file - this path has no
+      // parent -- avoids an NPE below.
+      return null;
+    }
+    
+    if (!mkdirs(parent.toString(), permissions, true, modTime)) {
       return null;
     }
     INodeFileUnderConstruction newNode = new INodeFileUnderConstruction(

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Mon Jul 30 23:31:42 2012
@@ -437,18 +437,16 @@ public class SecondaryNameNode implement
     // Returns a token that would be used to upload the merged image.
     CheckpointSignature sig = namenode.rollEditLog();
     
-    // Make sure we're talking to the same NN!
-    if (checkpointImage.getNamespaceID() != 0) {
-      // If the image actually has some data, make sure we're talking
-      // to the same NN as we did before.
-      sig.validateStorageInfo(checkpointImage);
-    } else {
-      // if we're a fresh 2NN, just take the storage info from the server
-      // we first talk to.
+    if ((checkpointImage.getNamespaceID() == 0) ||
+        (sig.isSameCluster(checkpointImage) &&
+         !sig.storageVersionMatches(checkpointImage.getStorage()))) {
+      // if we're a fresh 2NN, or if we're on the same cluster and our storage
+      // needs an upgrade, just take the storage info from the server.
       dstStorage.setStorageInfo(sig);
       dstStorage.setClusterID(sig.getClusterID());
       dstStorage.setBlockPoolID(sig.getBlockpoolID());
     }
+    sig.validateStorageInfo(checkpointImage);
 
     // error simulation code for junit test
     CheckpointFaultInjector.getInstance().afterSecondaryCallsRollEditLog();
@@ -703,7 +701,7 @@ public class SecondaryNameNode implement
     /**
      * Analyze checkpoint directories.
      * Create directories if they do not exist.
-     * Recover from an unsuccessful checkpoint is necessary.
+     * Recover from an unsuccessful checkpoint if necessary.
      *
      * @throws IOException
      */

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java Mon Jul 30 23:31:42 2012
@@ -17,17 +17,20 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.metrics;
 
+import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
+import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
-import static org.apache.hadoop.metrics2.impl.MsInfo.*;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
 import org.apache.hadoop.metrics2.lib.MutableRate;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 
@@ -57,15 +60,31 @@ public class NameNodeMetrics {
 
   @Metric("Journal transactions") MutableRate transactions;
   @Metric("Journal syncs") MutableRate syncs;
+  MutableQuantiles[] syncsQuantiles;
   @Metric("Journal transactions batched in sync")
   MutableCounterLong transactionsBatchedInSync;
   @Metric("Block report") MutableRate blockReport;
+  MutableQuantiles[] blockReportQuantiles;
 
   @Metric("Duration in SafeMode at startup") MutableGaugeInt safeModeTime;
   @Metric("Time loading FS Image at startup") MutableGaugeInt fsImageLoadTime;
 
-  NameNodeMetrics(String processName, String sessionId) {
+  NameNodeMetrics(String processName, String sessionId, int[] intervals) {
     registry.tag(ProcessName, processName).tag(SessionId, sessionId);
+    
+    final int len = intervals.length;
+    syncsQuantiles = new MutableQuantiles[len];
+    blockReportQuantiles = new MutableQuantiles[len];
+    
+    for (int i = 0; i < len; i++) {
+      int interval = intervals[i];
+      syncsQuantiles[i] = registry.newQuantiles(
+          "syncs" + interval + "s",
+          "Journal syncs", "ops", "latency", interval);
+      blockReportQuantiles[i] = registry.newQuantiles(
+          "blockReport" + interval + "s", 
+          "Block report", "ops", "latency", interval);
+    }
   }
 
   public static NameNodeMetrics create(Configuration conf, NamenodeRole r) {
@@ -73,7 +92,11 @@ public class NameNodeMetrics {
     String processName = r.toString();
     MetricsSystem ms = DefaultMetricsSystem.instance();
     JvmMetrics.create(processName, sessionId, ms);
-    return ms.register(new NameNodeMetrics(processName, sessionId));
+    
+    // Percentile measurement is off by default, by watching no intervals
+    int[] intervals = 
+        conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY);
+    return ms.register(new NameNodeMetrics(processName, sessionId, intervals));
   }
 
   public void shutdown() {
@@ -146,6 +169,9 @@ public class NameNodeMetrics {
 
   public void addSync(long elapsed) {
     syncs.add(elapsed);
+    for (MutableQuantiles q : syncsQuantiles) {
+      q.add(elapsed);
+    }
   }
 
   public void setFsImageLoadTime(long elapsed) {
@@ -154,6 +180,9 @@ public class NameNodeMetrics {
 
   public void addBlockReport(long latency) {
     blockReport.add(latency);
+    for (MutableQuantiles q : blockReportQuantiles) {
+      q.add(latency);
+    }
   }
 
   public void setSafeModeTime(long elapsed) {

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Mon Jul 30 23:31:42 2012
@@ -55,6 +55,7 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.hdfs.ByteRangeInputStream;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
@@ -88,6 +89,7 @@ import org.apache.hadoop.hdfs.web.resour
 import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
 import org.apache.hadoop.hdfs.web.resources.UserParam;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
@@ -147,6 +149,7 @@ public class WebHdfsFileSystem extends F
   private URI uri;
   private Token<?> delegationToken;
   private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
+  private RetryPolicy retryPolicy = null;
   private Path workingDir;
 
   {
@@ -179,6 +182,7 @@ public class WebHdfsFileSystem extends F
       throw new IllegalArgumentException(e);
     }
     this.nnAddr = NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort());
+    this.retryPolicy = NameNodeProxies.getDefaultRetryPolicy(conf);
     this.workingDir = getHomeDirectory();
 
     if (UserGroupInformation.isSecurityEnabled()) {
@@ -276,13 +280,13 @@ public class WebHdfsFileSystem extends F
   }
 
   private static Map<?, ?> validateResponse(final HttpOpParam.Op op,
-      final HttpURLConnection conn) throws IOException {
+      final HttpURLConnection conn, boolean unwrapException) throws IOException {
     final int code = conn.getResponseCode();
     if (code != op.getExpectedHttpResponseCode()) {
       final Map<?, ?> m;
       try {
         m = jsonParse(conn, true);
-      } catch(IOException e) {
+      } catch(Exception e) {
         throw new IOException("Unexpected HTTP response: code=" + code + " != "
             + op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
             + ", message=" + conn.getResponseMessage(), e);
@@ -293,22 +297,43 @@ public class WebHdfsFileSystem extends F
       }
 
       final RemoteException re = JsonUtil.toRemoteException(m);
-      throw re.unwrapRemoteException(AccessControlException.class,
-          InvalidToken.class,
-          AuthenticationException.class,
-          AuthorizationException.class,
-          FileAlreadyExistsException.class,
-          FileNotFoundException.class,
-          ParentNotDirectoryException.class,
-          UnresolvedPathException.class,
-          SafeModeException.class,
-          DSQuotaExceededException.class,
-          NSQuotaExceededException.class);
+      throw unwrapException? toIOException(re): re;
     }
     return null;
   }
 
   /**
+   * Covert an exception to an IOException.
+   * 
+   * For a non-IOException, wrap it with IOException.
+   * For a RemoteException, unwrap it.
+   * For an IOException which is not a RemoteException, return it. 
+   */
+  private static IOException toIOException(Exception e) {
+    if (!(e instanceof IOException)) {
+      return new IOException(e);
+    }
+
+    final IOException ioe = (IOException)e;
+    if (!(ioe instanceof RemoteException)) {
+      return ioe;
+    }
+
+    final RemoteException re = (RemoteException)ioe;
+    return re.unwrapRemoteException(AccessControlException.class,
+        InvalidToken.class,
+        AuthenticationException.class,
+        AuthorizationException.class,
+        FileAlreadyExistsException.class,
+        FileNotFoundException.class,
+        ParentNotDirectoryException.class,
+        UnresolvedPathException.class,
+        SafeModeException.class,
+        DSQuotaExceededException.class,
+        NSQuotaExceededException.class);
+  }
+
+  /**
    * Return a URL pointing to given path on the namenode.
    *
    * @param path to obtain the URL for
@@ -362,67 +387,13 @@ public class WebHdfsFileSystem extends F
   }
 
   private HttpURLConnection getHttpUrlConnection(URL url)
-      throws IOException {
+      throws IOException, AuthenticationException {
     final HttpURLConnection conn;
-    try {
-      if (ugi.hasKerberosCredentials()) { 
-        conn = new AuthenticatedURL(AUTH).openConnection(url, authToken);
-      } else {
-        conn = (HttpURLConnection)url.openConnection();
-      }
-    } catch (AuthenticationException e) {
-      throw new IOException("Authentication failed, url=" + url, e);
-    }
-    return conn;
-  }
-  
-  private HttpURLConnection httpConnect(final HttpOpParam.Op op, final Path fspath,
-      final Param<?,?>... parameters) throws IOException {
-    final URL url = toUrl(op, fspath, parameters);
-
-    //connect and get response
-    HttpURLConnection conn = getHttpUrlConnection(url);
-    try {
-      conn.setRequestMethod(op.getType().toString());
-      if (op.getDoOutput()) {
-        conn = twoStepWrite(conn, op);
-        conn.setRequestProperty("Content-Type", "application/octet-stream");
-      }
-      conn.setDoOutput(op.getDoOutput());
-      conn.connect();
-      return conn;
-    } catch (IOException e) {
-      conn.disconnect();
-      throw e;
+    if (ugi.hasKerberosCredentials()) { 
+      conn = new AuthenticatedURL(AUTH).openConnection(url, authToken);
+    } else {
+      conn = (HttpURLConnection)url.openConnection();
     }
-  }
-  
-  /**
-   * Two-step Create/Append:
-   * Step 1) Submit a Http request with neither auto-redirect nor data. 
-   * Step 2) Submit another Http request with the URL from the Location header with data.
-   * 
-   * The reason of having two-step create/append is for preventing clients to
-   * send out the data before the redirect. This issue is addressed by the
-   * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
-   * Unfortunately, there are software library bugs (e.g. Jetty 6 http server
-   * and Java 6 http client), which do not correctly implement "Expect:
-   * 100-continue". The two-step create/append is a temporary workaround for
-   * the software library bugs.
-   */
-  static HttpURLConnection twoStepWrite(HttpURLConnection conn,
-      final HttpOpParam.Op op) throws IOException {
-    //Step 1) Submit a Http request with neither auto-redirect nor data. 
-    conn.setInstanceFollowRedirects(false);
-    conn.setDoOutput(false);
-    conn.connect();
-    validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn);
-    final String redirect = conn.getHeaderField("Location");
-    conn.disconnect();
-
-    //Step 2) Submit another Http request with the URL from the Location header with data.
-    conn = (HttpURLConnection)new URL(redirect).openConnection();
-    conn.setRequestMethod(op.getType().toString());
     return conn;
   }
 
@@ -438,12 +409,158 @@ public class WebHdfsFileSystem extends F
    */
   private Map<?, ?> run(final HttpOpParam.Op op, final Path fspath,
       final Param<?,?>... parameters) throws IOException {
-    final HttpURLConnection conn = httpConnect(op, fspath, parameters);
-    try {
-      final Map<?, ?> m = validateResponse(op, conn);
-      return m != null? m: jsonParse(conn, false);
-    } finally {
-      conn.disconnect();
+    return new Runner(op, fspath, parameters).run().json;
+  }
+
+  /**
+   * This class is for initialing a HTTP connection, connecting to server,
+   * obtaining a response, and also handling retry on failures.
+   */
+  class Runner {
+    private final HttpOpParam.Op op;
+    private final URL url;
+    private final boolean redirected;
+
+    private boolean checkRetry;
+    private HttpURLConnection conn = null;
+    private Map<?, ?> json = null;
+
+    Runner(final HttpOpParam.Op op, final URL url, final boolean redirected) {
+      this.op = op;
+      this.url = url;
+      this.redirected = redirected;
+    }
+
+    Runner(final HttpOpParam.Op op, final Path fspath,
+        final Param<?,?>... parameters) throws IOException {
+      this(op, toUrl(op, fspath, parameters), false);
+    }
+
+    Runner(final HttpOpParam.Op op, final HttpURLConnection conn) {
+      this(op, null, false);
+      this.conn = conn;
+    }
+
+    private void init() throws IOException {
+      checkRetry = !redirected;
+      try {
+        conn = getHttpUrlConnection(url);
+      } catch(AuthenticationException ae) {
+        checkRetry = false;
+        throw new IOException("Authentication failed, url=" + url, ae);
+      }
+    }
+    
+    private void connect() throws IOException {
+      connect(op.getDoOutput());
+    }
+
+    private void connect(boolean doOutput) throws IOException {
+      conn.setRequestMethod(op.getType().toString());
+      conn.setDoOutput(doOutput);
+      conn.setInstanceFollowRedirects(false);
+      conn.connect();
+    }
+
+    private void disconnect() {
+      if (conn != null) {
+        conn.disconnect();
+        conn = null;
+      }
+    }
+
+    Runner run() throws IOException {
+      for(int retry = 0; ; retry++) {
+        try {
+          init();
+          if (op.getDoOutput()) {
+            twoStepWrite();
+          } else {
+            getResponse(op != GetOpParam.Op.OPEN);
+          }
+          return this;
+        } catch(IOException ioe) {
+          shouldRetry(ioe, retry);
+        }
+      }
+    }
+
+    private void shouldRetry(final IOException ioe, final int retry
+        ) throws IOException {
+      if (checkRetry) {
+        try {
+          final RetryPolicy.RetryAction a = retryPolicy.shouldRetry(
+              ioe, retry, 0, true);
+          if (a.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
+            LOG.info("Retrying connect to namenode: " + nnAddr
+                + ". Already tried " + retry + " time(s); retry policy is "
+                + retryPolicy + ", delay " + a.delayMillis + "ms.");      
+            Thread.sleep(a.delayMillis);
+            return;
+          }
+        } catch(Exception e) {
+          LOG.warn("Original exception is ", ioe);
+          throw toIOException(e);
+        }
+      }
+      throw toIOException(ioe);
+    }
+
+    /**
+     * Two-step Create/Append:
+     * Step 1) Submit a Http request with neither auto-redirect nor data. 
+     * Step 2) Submit another Http request with the URL from the Location header with data.
+     * 
+     * The reason of having two-step create/append is for preventing clients to
+     * send out the data before the redirect. This issue is addressed by the
+     * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
+     * Unfortunately, there are software library bugs (e.g. Jetty 6 http server
+     * and Java 6 http client), which do not correctly implement "Expect:
+     * 100-continue". The two-step create/append is a temporary workaround for
+     * the software library bugs.
+     */
+    HttpURLConnection twoStepWrite() throws IOException {
+      //Step 1) Submit a Http request with neither auto-redirect nor data. 
+      connect(false);
+      validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn, false);
+      final String redirect = conn.getHeaderField("Location");
+      disconnect();
+      checkRetry = false;
+      
+      //Step 2) Submit another Http request with the URL from the Location header with data.
+      conn = (HttpURLConnection)new URL(redirect).openConnection();
+      conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
+      connect();
+      return conn;
+    }
+
+    FSDataOutputStream write(final int bufferSize) throws IOException {
+      return WebHdfsFileSystem.this.write(op, conn, bufferSize);
+    }
+
+    void getResponse(boolean getJsonAndDisconnect) throws IOException {
+      try {
+        connect();
+        if (!redirected && op.getRedirect()) {
+          final String redirect = conn.getHeaderField("Location");
+          json = validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op),
+              conn, false);
+          disconnect();
+  
+          checkRetry = false;
+          conn = (HttpURLConnection)new URL(redirect).openConnection();
+          connect();
+        }
+
+        json = validateResponse(op, conn, false);
+        if (json == null && getJsonAndDisconnect) {
+          json = jsonParse(conn, false);
+        }
+      } finally {
+        if (getJsonAndDisconnect) {
+          disconnect();
+        }
+      }
     }
   }
 
@@ -577,7 +694,7 @@ public class WebHdfsFileSystem extends F
           super.close();
         } finally {
           try {
-            validateResponse(op, conn);
+            validateResponse(op, conn, true);
           } finally {
             conn.disconnect();
           }
@@ -593,13 +710,14 @@ public class WebHdfsFileSystem extends F
     statistics.incrementWriteOps(1);
 
     final HttpOpParam.Op op = PutOpParam.Op.CREATE;
-    final HttpURLConnection conn = httpConnect(op, f, 
+    return new Runner(op, f, 
         new PermissionParam(applyUMask(permission)),
         new OverwriteParam(overwrite),
         new BufferSizeParam(bufferSize),
         new ReplicationParam(replication),
-        new BlockSizeParam(blockSize));
-    return write(op, conn, bufferSize);
+        new BlockSizeParam(blockSize))
+      .run()
+      .write(bufferSize);
   }
 
   @Override
@@ -608,9 +726,9 @@ public class WebHdfsFileSystem extends F
     statistics.incrementWriteOps(1);
 
     final HttpOpParam.Op op = PostOpParam.Op.APPEND;
-    final HttpURLConnection conn = httpConnect(op, f, 
-        new BufferSizeParam(bufferSize));
-    return write(op, conn, bufferSize);
+    return new Runner(op, f, new BufferSizeParam(bufferSize))
+      .run()
+      .write(bufferSize);
   }
 
   @SuppressWarnings("deprecation")
@@ -637,26 +755,17 @@ public class WebHdfsFileSystem extends F
   }
 
   class OffsetUrlOpener extends ByteRangeInputStream.URLOpener {
-    /** The url with offset parameter */
-    private URL offsetUrl;
-  
     OffsetUrlOpener(final URL url) {
       super(url);
     }
 
-    /** Open connection with offset url. */
-    @Override
-    protected HttpURLConnection openConnection() throws IOException {
-      return getHttpUrlConnection(offsetUrl);
-    }
-
-    /** Setup offset url before open connection. */
+    /** Setup offset url and connect. */
     @Override
-    protected HttpURLConnection openConnection(final long offset) throws IOException {
-      offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset));
-      final HttpURLConnection conn = openConnection();
-      conn.setRequestMethod("GET");
-      return conn;
+    protected HttpURLConnection connect(final long offset,
+        final boolean resolved) throws IOException {
+      final URL offsetUrl = offset == 0L? url
+          : new URL(url + "&" + new OffsetParam(offset));
+      return new Runner(GetOpParam.Op.OPEN, offsetUrl, resolved).run().conn;
     }  
   }
 
@@ -697,12 +806,6 @@ public class WebHdfsFileSystem extends F
     OffsetUrlInputStream(OffsetUrlOpener o, OffsetUrlOpener r) {
       super(o, r);
     }
-    
-    @Override
-    protected void checkResponseCode(final HttpURLConnection connection
-        ) throws IOException {
-      validateResponse(GetOpParam.Op.OPEN, connection);
-    }
 
     /** Remove offset parameter before returning the resolved url. */
     @Override
@@ -835,8 +938,7 @@ public class WebHdfsFileSystem extends F
     }
 
     private static WebHdfsFileSystem getWebHdfs(
-        final Token<?> token, final Configuration conf
-        ) throws IOException, InterruptedException, URISyntaxException {
+        final Token<?> token, final Configuration conf) throws IOException {
       
       final InetSocketAddress nnAddr = SecurityUtil.getTokenServiceAddr(token);
       final URI uri = DFSUtil.createUri(WebHdfsFileSystem.SCHEME, nnAddr);
@@ -850,12 +952,7 @@ public class WebHdfsFileSystem extends F
       // update the kerberos credentials, if they are coming from a keytab
       ugi.reloginFromKeytab();
 
-      try {
-        WebHdfsFileSystem webhdfs = getWebHdfs(token, conf);
-        return webhdfs.renewDelegationToken(token);
-      } catch (URISyntaxException e) {
-        throw new IOException(e);
-      }
+      return getWebHdfs(token, conf).renewDelegationToken(token);
     }
   
     @Override
@@ -865,12 +962,7 @@ public class WebHdfsFileSystem extends F
       // update the kerberos credentials, if they are coming from a keytab
       ugi.checkTGTAndReloginFromKeytab();
 
-      try {
-        final WebHdfsFileSystem webhdfs = getWebHdfs(token, conf);
-        webhdfs.cancelDelegationToken(token);
-      } catch (URISyntaxException e) {
-        throw new IOException(e);
-      }
+      getWebHdfs(token, conf).cancelDelegationToken(token);
     }
   }
   

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.java Mon Jul 30 23:31:42 2012
@@ -44,6 +44,11 @@ public class DeleteOpParam extends HttpO
     }
 
     @Override
+    public boolean getRedirect() {
+      return false;
+    }
+
+    @Override
     public int getExpectedHttpResponseCode() {
       return expectedHttpResponseCode;
     }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java Mon Jul 30 23:31:42 2012
@@ -23,25 +23,27 @@ import java.net.HttpURLConnection;
 public class GetOpParam extends HttpOpParam<GetOpParam.Op> {
   /** Get operations. */
   public static enum Op implements HttpOpParam.Op {
-    OPEN(HttpURLConnection.HTTP_OK),
+    OPEN(true, HttpURLConnection.HTTP_OK),
 
-    GETFILESTATUS(HttpURLConnection.HTTP_OK),
-    LISTSTATUS(HttpURLConnection.HTTP_OK),
-    GETCONTENTSUMMARY(HttpURLConnection.HTTP_OK),
-    GETFILECHECKSUM(HttpURLConnection.HTTP_OK),
-
-    GETHOMEDIRECTORY(HttpURLConnection.HTTP_OK),
-    GETDELEGATIONTOKEN(HttpURLConnection.HTTP_OK),
-    GETDELEGATIONTOKENS(HttpURLConnection.HTTP_OK),
+    GETFILESTATUS(false, HttpURLConnection.HTTP_OK),
+    LISTSTATUS(false, HttpURLConnection.HTTP_OK),
+    GETCONTENTSUMMARY(false, HttpURLConnection.HTTP_OK),
+    GETFILECHECKSUM(true, HttpURLConnection.HTTP_OK),
+
+    GETHOMEDIRECTORY(false, HttpURLConnection.HTTP_OK),
+    GETDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK),
+    GETDELEGATIONTOKENS(false, HttpURLConnection.HTTP_OK),
 
     /** GET_BLOCK_LOCATIONS is a private unstable op. */
-    GET_BLOCK_LOCATIONS(HttpURLConnection.HTTP_OK),
+    GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK),
 
-    NULL(HttpURLConnection.HTTP_NOT_IMPLEMENTED);
+    NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);
 
+    final boolean redirect;
     final int expectedHttpResponseCode;
 
-    Op(final int expectedHttpResponseCode) {
+    Op(final boolean redirect, final int expectedHttpResponseCode) {
+      this.redirect = redirect;
       this.expectedHttpResponseCode = expectedHttpResponseCode;
     }
 
@@ -56,6 +58,11 @@ public class GetOpParam extends HttpOpPa
     }
 
     @Override
+    public boolean getRedirect() {
+      return redirect;
+    }
+
+    @Override
     public int getExpectedHttpResponseCode() {
       return expectedHttpResponseCode;
     }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java Mon Jul 30 23:31:42 2012
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hdfs.web.resources;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
 import javax.ws.rs.core.Response;
 
 
@@ -42,6 +46,9 @@ public abstract class HttpOpParam<E exte
     /** @return true if the operation will do output. */
     public boolean getDoOutput();
 
+    /** @return true if the operation will be redirected. */
+    public boolean getRedirect();
+
     /** @return true the expected http response code. */
     public int getExpectedHttpResponseCode();
 
@@ -51,15 +58,25 @@ public abstract class HttpOpParam<E exte
 
   /** Expects HTTP response 307 "Temporary Redirect". */
   public static class TemporaryRedirectOp implements Op {
-    static final TemporaryRedirectOp CREATE = new TemporaryRedirectOp(PutOpParam.Op.CREATE);
-    static final TemporaryRedirectOp APPEND = new TemporaryRedirectOp(PostOpParam.Op.APPEND);
+    static final TemporaryRedirectOp CREATE = new TemporaryRedirectOp(
+        PutOpParam.Op.CREATE);
+    static final TemporaryRedirectOp APPEND = new TemporaryRedirectOp(
+        PostOpParam.Op.APPEND);
+    static final TemporaryRedirectOp OPEN = new TemporaryRedirectOp(
+        GetOpParam.Op.OPEN);
+    static final TemporaryRedirectOp GETFILECHECKSUM = new TemporaryRedirectOp(
+        GetOpParam.Op.GETFILECHECKSUM);
     
+    static final List<TemporaryRedirectOp> values
+        = Collections.unmodifiableList(Arrays.asList(
+            new TemporaryRedirectOp[]{CREATE, APPEND, OPEN, GETFILECHECKSUM}));
+
     /** Get an object for the given op. */
     public static TemporaryRedirectOp valueOf(final Op op) {
-      if (op == CREATE.op) {
-        return CREATE;
-      } else if (op == APPEND.op) {
-        return APPEND;
+      for(TemporaryRedirectOp t : values) {
+        if (op == t.op) {
+          return t;
+        }
       }
       throw new IllegalArgumentException(op + " not found.");
     }
@@ -80,6 +97,11 @@ public abstract class HttpOpParam<E exte
       return op.getDoOutput();
     }
 
+    @Override
+    public boolean getRedirect() {
+      return false;
+    }
+
     /** Override the original expected response with "Temporary Redirect". */
     @Override
     public int getExpectedHttpResponseCode() {

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PostOpParam.java Mon Jul 30 23:31:42 2012
@@ -44,6 +44,11 @@ public class PostOpParam extends HttpOpP
     }
 
     @Override
+    public boolean getRedirect() {
+      return true;
+    }
+
+    @Override
     public int getExpectedHttpResponseCode() {
       return expectedHttpResponseCode;
     }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/PutOpParam.java Mon Jul 30 23:31:42 2012
@@ -39,11 +39,11 @@ public class PutOpParam extends HttpOpPa
     
     NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);
 
-    final boolean doOutput;
+    final boolean doOutputAndRedirect;
     final int expectedHttpResponseCode;
 
-    Op(final boolean doOutput, final int expectedHttpResponseCode) {
-      this.doOutput = doOutput;
+    Op(final boolean doOutputAndRedirect, final int expectedHttpResponseCode) {
+      this.doOutputAndRedirect = doOutputAndRedirect;
       this.expectedHttpResponseCode = expectedHttpResponseCode;
     }
 
@@ -54,7 +54,12 @@ public class PutOpParam extends HttpOpPa
 
     @Override
     public boolean getDoOutput() {
-      return doOutput;
+      return doOutputAndRedirect;
+    }
+
+    @Override
+    public boolean getRedirect() {
+      return doOutputAndRedirect;
     }
 
     @Override

Propchange: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1363593-1367364

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt Mon Jul 30 23:31:42 2012
@@ -69,5 +69,6 @@ IF(FUSE_FOUND)
         ${JAVA_JVM_LIBRARY}
         hdfs
         m
+        pthread
     )
 ENDIF(FUSE_FOUND)

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.c?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.c (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.c Mon Jul 30 23:31:42 2012
@@ -16,17 +16,38 @@
  * limitations under the License.
  */
 
-#include "hdfs.h"
-#include "fuse_dfs.h"
 #include "fuse_connect.h"
+#include "fuse_dfs.h"
 #include "fuse_users.h" 
+#include "libhdfs/hdfs.h"
+#include "util/tree.h"
 
+#include <inttypes.h>
 #include <limits.h>
+#include <poll.h>
 #include <search.h>
 #include <stdio.h>
 #include <stdlib.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <utime.h>
+
+#define FUSE_CONN_DEFAULT_TIMER_PERIOD      5
+#define FUSE_CONN_DEFAULT_EXPIRY_PERIOD     (5 * 60)
+#define HADOOP_SECURITY_AUTHENTICATION      "hadoop.security.authentication"
+#define HADOOP_FUSE_CONNECTION_TIMEOUT      "hadoop.fuse.connection.timeout"
+#define HADOOP_FUSE_TIMER_PERIOD            "hadoop.fuse.timer.period"
+
+/** Length of the buffer needed by asctime_r */
+#define TIME_STR_LEN 26
+
+struct hdfsConn;
+
+static int hdfsConnCompare(const struct hdfsConn *a, const struct hdfsConn *b);
+static void hdfsConnExpiry(void);
+static void* hdfsConnExpiryThread(void *v);
 
-#define HADOOP_SECURITY_AUTHENTICATION "hadoop.security.authentication"
+RB_HEAD(hdfsConnTree, hdfsConn);
 
 enum authConf {
     AUTH_CONF_UNKNOWN,
@@ -34,80 +55,308 @@ enum authConf {
     AUTH_CONF_OTHER,
 };
 
-#define MAX_ELEMENTS (16 * 1024)
-static struct hsearch_data *fsTable = NULL;
-static enum authConf hdfsAuthConf = AUTH_CONF_UNKNOWN;
-static pthread_mutex_t tableMutex = PTHREAD_MUTEX_INITIALIZER;
-
-/*
- * Allocate a hash table for fs handles. Returns 0 on success,
- * -1 on failure.
- */
-int allocFsTable(void) {
-  assert(NULL == fsTable);
-  fsTable = calloc(1, sizeof(struct hsearch_data));
-  if (0 == hcreate_r(MAX_ELEMENTS, fsTable)) {
-    ERROR("Unable to initialize connection table");
-    return -1;
+struct hdfsConn {
+  RB_ENTRY(hdfsConn) entry;
+  /** How many threads are currently using this hdfsConnection object */
+  int64_t refcnt;
+  /** The username used to make this connection.  Dynamically allocated. */
+  char *usrname;
+  /** Kerberos ticket cache path, or NULL if this is not a kerberized
+   * connection.  Dynamically allocated. */
+  char *kpath;
+  /** mtime of the kpath, if the kpath is non-NULL */
+  time_t kPathMtime;
+  /** nanosecond component of the mtime of the kpath, if the kpath is non-NULL */
+  long kPathMtimeNs;
+  /** The cached libhdfs fs instance */
+  hdfsFS fs;
+  /** Nonzero if this hdfs connection needs to be closed as soon as possible.
+   * If this is true, the connection has been removed from the tree. */
+  int condemned;
+  /** Number of times we should run the expiration timer on this connection
+   * before removing it. */
+  int expirationCount;
+};
+
+RB_GENERATE(hdfsConnTree, hdfsConn, entry, hdfsConnCompare);
+
+/** Current cached libhdfs connections */
+static struct hdfsConnTree gConnTree;
+
+/** The URI used to make our connections.  Dynamically allocated. */
+static char *gUri;
+
+/** The port used to make our connections, or 0. */
+static int gPort;
+
+/** Lock which protects gConnTree and gConnectTimer->active */
+static pthread_mutex_t gConnMutex;
+
+/** Type of authentication configured */
+static enum authConf gHdfsAuthConf;
+
+/** FUSE connection timer expiration period */
+static int32_t gTimerPeriod;
+
+/** FUSE connection expiry period */
+static int32_t gExpiryPeriod;
+
+/** FUSE timer expiration thread */
+static pthread_t gTimerThread;
+
+/** 
+ * Find out what type of authentication the system administrator
+ * has configured.
+ *
+ * @return     the type of authentication, or AUTH_CONF_UNKNOWN on error.
+ */
+static enum authConf discoverAuthConf(void)
+{
+  int ret;
+  char *val = NULL;
+  enum authConf authConf;
+
+  ret = hdfsConfGetStr(HADOOP_SECURITY_AUTHENTICATION, &val);
+  if (ret)
+    authConf = AUTH_CONF_UNKNOWN;
+  else if (!val)
+    authConf = AUTH_CONF_OTHER;
+  else if (!strcmp(val, "kerberos"))
+    authConf = AUTH_CONF_KERBEROS;
+  else
+    authConf = AUTH_CONF_OTHER;
+  free(val);
+  return authConf;
+}
+
+int fuseConnectInit(const char *nnUri, int port)
+{
+  const char *timerPeriod;
+  int ret;
+
+  gTimerPeriod = FUSE_CONN_DEFAULT_TIMER_PERIOD;
+  ret = hdfsConfGetInt(HADOOP_FUSE_TIMER_PERIOD, &gTimerPeriod);
+  if (ret) {
+    fprintf(stderr, "Unable to determine the configured value for %s.",
+          HADOOP_FUSE_TIMER_PERIOD);
+    return -EINVAL;
+  }
+  if (gTimerPeriod < 1) {
+    fprintf(stderr, "Invalid value %d given for %s.\n",
+          gTimerPeriod, HADOOP_FUSE_TIMER_PERIOD);
+    return -EINVAL;
+  }
+  gExpiryPeriod = FUSE_CONN_DEFAULT_EXPIRY_PERIOD;
+  ret = hdfsConfGetInt(HADOOP_FUSE_CONNECTION_TIMEOUT, &gExpiryPeriod);
+  if (ret) {
+    fprintf(stderr, "Unable to determine the configured value for %s.",
+          HADOOP_FUSE_CONNECTION_TIMEOUT);
+    return -EINVAL;
   }
+  if (gExpiryPeriod < 1) {
+    fprintf(stderr, "Invalid value %d given for %s.\n",
+          gExpiryPeriod, HADOOP_FUSE_CONNECTION_TIMEOUT);
+    return -EINVAL;
+  }
+  gHdfsAuthConf = discoverAuthConf();
+  if (gHdfsAuthConf == AUTH_CONF_UNKNOWN) {
+    fprintf(stderr, "Unable to determine the configured value for %s.",
+          HADOOP_SECURITY_AUTHENTICATION);
+    return -EINVAL;
+  }
+  gPort = port;
+  gUri = strdup(nnUri);
+  if (!gUri) {
+    fprintf(stderr, "fuseConnectInit: OOM allocting nnUri\n");
+    return -ENOMEM;
+  }
+  ret = pthread_mutex_init(&gConnMutex, NULL);
+  if (ret) {
+    free(gUri);
+    fprintf(stderr, "fuseConnectInit: pthread_mutex_init failed with error %d\n",
+            ret);
+    return -ret;
+  }
+  RB_INIT(&gConnTree);
+  ret = pthread_create(&gTimerThread, NULL, hdfsConnExpiryThread, NULL);
+  if (ret) {
+    free(gUri);
+    pthread_mutex_destroy(&gConnMutex);
+    fprintf(stderr, "fuseConnectInit: pthread_create failed with error %d\n",
+            ret);
+    return -ret;
+  }
+  fprintf(stderr, "fuseConnectInit: initialized with timer period %d, "
+          "expiry period %d\n", gTimerPeriod, gExpiryPeriod);
   return 0;
 }
 
-/*
- * Find a fs handle for the given key. Returns a fs handle, 
- * or NULL if there is no fs for the given key.
- */
-static hdfsFS findFs(char *key) {
-  ENTRY entry;
-  ENTRY *entryP = NULL;
-  entry.key = key;
-  if (0 == hsearch_r(entry, FIND, &entryP, fsTable)) {
-    return NULL;
-  }
-  assert(NULL != entryP->data);
-  return (hdfsFS)entryP->data;
-}
-
-/*
- * Insert the given fs handle into the table.
- * Returns 0 on success, -1 on failure.
- */
-static int insertFs(char *key, hdfsFS fs) {
-  ENTRY entry;
-  ENTRY *entryP = NULL;
-  assert(NULL != fs);
-  entry.key = strdup(key);
-  if (entry.key == NULL) {
-    return -1;
-  }
-  entry.data = (void*)fs;
-  if (0 == hsearch_r(entry, ENTER, &entryP, fsTable)) {
-    return -1;
+/**
+ * Compare two libhdfs connections by username
+ *
+ * @param a                The first libhdfs connection
+ * @param b                The second libhdfs connection
+ *
+ * @return                 -1, 0, or 1 depending on a < b, a ==b, a > b
+ */
+static int hdfsConnCompare(const struct hdfsConn *a, const struct hdfsConn *b)
+{
+  return strcmp(a->usrname, b->usrname);
+}
+
+/**
+ * Find a libhdfs connection by username
+ *
+ * @param usrname         The username to look up
+ *
+ * @return                The connection, or NULL if none could be found
+ */
+static struct hdfsConn* hdfsConnFind(const char *usrname)
+{
+  struct hdfsConn exemplar;
+
+  memset(&exemplar, 0, sizeof(exemplar));
+  exemplar.usrname = (char*)usrname;
+  return RB_FIND(hdfsConnTree, &gConnTree, &exemplar);
+}
+
+/**
+ * Free the resource associated with a libhdfs connection.
+ *
+ * You must remove the connection from the tree before calling this function.
+ *
+ * @param conn            The libhdfs connection
+ */
+static void hdfsConnFree(struct hdfsConn *conn)
+{
+  int ret;
+
+  ret = hdfsDisconnect(conn->fs);
+  if (ret) {
+    fprintf(stderr, "hdfsConnFree(username=%s): "
+      "hdfsDisconnect failed with error %d\n",
+      (conn->usrname ? conn->usrname : "(null)"), ret);
+  }
+  free(conn->usrname);
+  free(conn->kpath);
+  free(conn);
+}
+
+/**
+ * Convert a time_t to a string.
+ *
+ * @param sec           time in seconds since the epoch
+ * @param buf           (out param) output buffer
+ * @param bufLen        length of output buffer
+ *
+ * @return              0 on success; ENAMETOOLONG if the provided buffer was
+ *                      too short
+ */
+static int timeToStr(time_t sec, char *buf, size_t bufLen)
+{
+  struct tm tm, *out;
+  size_t l;
+
+  if (bufLen < TIME_STR_LEN) {
+    return -ENAMETOOLONG;
   }
+  out = localtime_r(&sec, &tm);
+  asctime_r(out, buf);
+  // strip trailing newline
+  l = strlen(buf);
+  if (l != 0)
+    buf[l - 1] = '\0';
   return 0;
 }
 
 /** 
- * Find out what type of authentication the system administrator
- * has configured.
+ * Check an HDFS connection's Kerberos path.
  *
- * @return     the type of authentication, or AUTH_CONF_UNKNOWN on error.
+ * If the mtime of the Kerberos ticket cache file has changed since we first
+ * opened the connection, mark the connection as condemned and remove it from
+ * the hdfs connection tree.
+ *
+ * @param conn      The HDFS connection
  */
-static enum authConf discoverAuthConf(void)
+static int hdfsConnCheckKpath(const struct hdfsConn *conn)
 {
-    int ret;
-    char *val = NULL;
-    enum authConf authConf;
-
-    ret = hdfsConfGet(HADOOP_SECURITY_AUTHENTICATION, &val);
-    if (ret)
-        authConf = AUTH_CONF_UNKNOWN;
-    else if (!strcmp(val, "kerberos"))
-        authConf = AUTH_CONF_KERBEROS;
-    else
-        authConf = AUTH_CONF_OTHER;
-    free(val);
-    return authConf;
+  int ret;
+  struct stat st;
+  char prevTimeBuf[TIME_STR_LEN], newTimeBuf[TIME_STR_LEN];
+
+  if (stat(conn->kpath, &st) < 0) {
+    ret = errno;
+    if (ret == ENOENT) {
+      fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): the kerberos "
+              "ticket cache file '%s' has disappeared.  Condemning the "
+              "connection.\n", conn->usrname, conn->kpath);
+    } else {
+      fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): stat(%s) "
+              "failed with error code %d.  Pessimistically condemning the "
+              "connection.\n", conn->usrname, conn->kpath, ret);
+    }
+    return -ret;
+  }
+  if ((st.st_mtim.tv_sec != conn->kPathMtime) ||
+      (st.st_mtim.tv_nsec != conn->kPathMtimeNs)) {
+    timeToStr(conn->kPathMtime, prevTimeBuf, sizeof(prevTimeBuf));
+    timeToStr(st.st_mtim.tv_sec, newTimeBuf, sizeof(newTimeBuf));
+    fprintf(stderr, "hdfsConnCheckKpath(conn.usrname=%s): mtime on '%s' "
+            "has changed from '%s' to '%s'.  Condemning the connection "
+            "because our cached Kerberos credentials have probably "
+            "changed.\n", conn->usrname, conn->kpath, prevTimeBuf, newTimeBuf);
+    return -EINTERNAL;
+  }
+  return 0;
+}
+
+/**
+ * Cache expiration logic.
+ *
+ * This function is called periodically by the cache expiration thread.  For
+ * each FUSE connection not currently in use (refcnt == 0) it will decrement the
+ * expirationCount for that connection.  Once the expirationCount reaches 0 for
+ * a connection, it can be garbage collected.
+ *
+ * We also check to see if the Kerberos credentials have changed.  If so, the
+ * connecton is immediately condemned, even if it is currently in use.
+ */
+static void hdfsConnExpiry(void)
+{
+  struct hdfsConn *conn, *tmpConn;
+
+  pthread_mutex_lock(&gConnMutex);
+  RB_FOREACH_SAFE(conn, hdfsConnTree, &gConnTree, tmpConn) {
+    if (conn->kpath) {
+      if (hdfsConnCheckKpath(conn)) {
+        conn->condemned = 1;
+        RB_REMOVE(hdfsConnTree, &gConnTree, conn);
+        if (conn->refcnt == 0) {
+          /* If the connection is not in use by any threads, delete it
+           * immediately.  If it is still in use by some threads, the last
+           * thread using it will clean it up later inside hdfsConnRelease. */
+          hdfsConnFree(conn);
+          continue;
+        }
+      }
+    }
+    if (conn->refcnt == 0) {
+      /* If the connection is not currently in use by a thread, check to see if
+       * it ought to be removed because it's too old. */
+      conn->expirationCount--;
+      if (conn->expirationCount <= 0) {
+        if (conn->condemned) {
+          fprintf(stderr, "hdfsConnExpiry: LOGIC ERROR: condemned connection "
+                  "as %s is still in the tree!\n", conn->usrname);
+        }
+        fprintf(stderr, "hdfsConnExpiry: freeing and removing connection as "
+                "%s because it's now too old.\n", conn->usrname);
+        RB_REMOVE(hdfsConnTree, &gConnTree, conn);
+        hdfsConnFree(conn);
+      }
+    }
+  }
+  pthread_mutex_unlock(&gConnMutex);
 }
 
 /**
@@ -129,9 +378,9 @@ static enum authConf discoverAuthConf(vo
  * @param path          (out param) the path to the ticket cache file
  * @param pathLen       length of the path buffer
  */
-static void findKerbTicketCachePath(char *path, size_t pathLen)
+static void findKerbTicketCachePath(struct fuse_context *ctx,
+                                    char *path, size_t pathLen)
 {
-  struct fuse_context *ctx = fuse_get_context();
   FILE *fp = NULL;
   static const char * const KRB5CCNAME = "\0KRB5CCNAME=";
   int c = '\0', pathIdx = 0, keyIdx = 0;
@@ -168,72 +417,213 @@ done:
   }
 }
 
-/*
- * Connect to the NN as the current user/group.
- * Returns a fs handle on success, or NULL on failure.
- */
-hdfsFS doConnectAsUser(const char *nn_uri, int nn_port) {
-  struct hdfsBuilder *bld;
-  uid_t uid = fuse_get_context()->uid;
-  char *user = getUsername(uid);
-  char kpath[PATH_MAX];
+/**
+ * Create a new libhdfs connection.
+ *
+ * @param usrname       Username to use for the new connection
+ * @param ctx           FUSE context to use for the new connection
+ * @param out           (out param) the new libhdfs connection
+ *
+ * @return              0 on success; error code otherwise
+ */
+static int fuseNewConnect(const char *usrname, struct fuse_context *ctx,
+        struct hdfsConn **out)
+{
+  struct hdfsBuilder *bld = NULL;
+  char kpath[PATH_MAX] = { 0 };
+  struct hdfsConn *conn = NULL;
   int ret;
-  hdfsFS fs = NULL;
-  if (NULL == user) {
-    goto done;
-  }
-
-  ret = pthread_mutex_lock(&tableMutex);
-  assert(0 == ret);
+  struct stat st;
 
-  fs = findFs(user);
-  if (NULL == fs) {
-    if (hdfsAuthConf == AUTH_CONF_UNKNOWN) {
-      hdfsAuthConf = discoverAuthConf();
-      if (hdfsAuthConf == AUTH_CONF_UNKNOWN) {
-        ERROR("Unable to determine the configured value for %s.",
-              HADOOP_SECURITY_AUTHENTICATION);
-        goto done;
-      }
-    }
-    bld = hdfsNewBuilder();
-    if (!bld) {
-      ERROR("Unable to create hdfs builder");
-      goto done;
+  conn = calloc(1, sizeof(struct hdfsConn));
+  if (!conn) {
+    fprintf(stderr, "fuseNewConnect: OOM allocating struct hdfsConn\n");
+    ret = -ENOMEM;
+    goto error;
+  }
+  bld = hdfsNewBuilder();
+  if (!bld) {
+    fprintf(stderr, "Unable to create hdfs builder\n");
+    ret = -ENOMEM;
+    goto error;
+  }
+  /* We always want to get a new FileSystem instance here-- that's why we call
+   * hdfsBuilderSetForceNewInstance.  Otherwise the 'cache condemnation' logic
+   * in hdfsConnExpiry will not work correctly, since FileSystem might re-use the
+   * existing cached connection which we wanted to get rid of.
+   */
+  hdfsBuilderSetForceNewInstance(bld);
+  hdfsBuilderSetNameNode(bld, gUri);
+  if (gPort) {
+    hdfsBuilderSetNameNodePort(bld, gPort);
+  }
+  hdfsBuilderSetUserName(bld, usrname);
+  if (gHdfsAuthConf == AUTH_CONF_KERBEROS) {
+    findKerbTicketCachePath(ctx, kpath, sizeof(kpath));
+    if (stat(kpath, &st) < 0) {
+      fprintf(stderr, "fuseNewConnect: failed to find Kerberos ticket cache "
+        "file '%s'.  Did you remember to kinit for UID %d?\n",
+        kpath, ctx->uid);
+      ret = -EACCES;
+      goto error;
     }
-    hdfsBuilderSetForceNewInstance(bld);
-    hdfsBuilderSetNameNode(bld, nn_uri);
-    if (nn_port) {
-        hdfsBuilderSetNameNodePort(bld, nn_port);
-    }
-    hdfsBuilderSetUserName(bld, user);
-    if (hdfsAuthConf == AUTH_CONF_KERBEROS) {
-      findKerbTicketCachePath(kpath, sizeof(kpath));
-      hdfsBuilderSetKerbTicketCachePath(bld, kpath);
-    }
-    fs = hdfsBuilderConnect(bld);
-    if (NULL == fs) {
-      int err = errno;
-      ERROR("Unable to create fs for user %s: error code %d", user, err);
-      goto done;
+    conn->kPathMtime = st.st_mtim.tv_sec;
+    conn->kPathMtimeNs = st.st_mtim.tv_nsec;
+    hdfsBuilderSetKerbTicketCachePath(bld, kpath);
+    conn->kpath = strdup(kpath);
+    if (!conn->kpath) {
+      fprintf(stderr, "fuseNewConnect: OOM allocating kpath\n");
+      ret = -ENOMEM;
+      goto error;
     }
-    if (-1 == insertFs(user, fs)) {
-      ERROR("Unable to cache fs for user %s", user);
+  }
+  conn->usrname = strdup(usrname);
+  if (!conn->usrname) {
+    fprintf(stderr, "fuseNewConnect: OOM allocating usrname\n");
+    ret = -ENOMEM;
+    goto error;
+  }
+  conn->fs = hdfsBuilderConnect(bld);
+  bld = NULL;
+  if (!conn->fs) {
+    ret = errno;
+    fprintf(stderr, "fuseNewConnect(usrname=%s): Unable to create fs: "
+            "error code %d\n", usrname, ret);
+    goto error;
+  }
+  RB_INSERT(hdfsConnTree, &gConnTree, conn);
+  *out = conn;
+  return 0;
+
+error:
+  if (bld) {
+    hdfsFreeBuilder(bld);
+  }
+  if (conn) {
+    free(conn->kpath);
+    free(conn->usrname);
+    free(conn);
+  }
+  return ret;
+}
+
+int fuseConnect(const char *usrname, struct fuse_context *ctx,
+                struct hdfsConn **out)
+{
+  int ret;
+  struct hdfsConn* conn;
+
+  pthread_mutex_lock(&gConnMutex);
+  conn = hdfsConnFind(usrname);
+  if (!conn) {
+    ret = fuseNewConnect(usrname, ctx, &conn);
+    if (ret) {
+      pthread_mutex_unlock(&gConnMutex);
+      fprintf(stderr, "fuseConnect(usrname=%s): fuseNewConnect failed with "
+              "error code %d\n", usrname, ret);
+      return ret;
     }
   }
+  conn->refcnt++;
+  conn->expirationCount = (gExpiryPeriod + gTimerPeriod - 1) / gTimerPeriod;
+  if (conn->expirationCount < 2)
+    conn->expirationCount = 2;
+  pthread_mutex_unlock(&gConnMutex);
+  *out = conn;
+  return 0;
+}
 
-done:
-  ret = pthread_mutex_unlock(&tableMutex);
-  assert(0 == ret);
-  free(user);
-  return fs;
+int fuseConnectAsThreadUid(struct hdfsConn **conn)
+{
+  struct fuse_context *ctx;
+  char *usrname;
+  int ret;
+  
+  ctx = fuse_get_context();
+  usrname = getUsername(ctx->uid);
+  ret = fuseConnect(usrname, ctx, conn);
+  free(usrname);
+  return ret;
 }
 
-/*
- * We currently cache a fs handle per-user in this module rather
- * than use the FileSystem cache in the java client. Therefore
- * we do not disconnect the fs handle here.
- */
-int doDisconnect(hdfsFS fs) {
+int fuseConnectTest(void)
+{
+  int ret;
+  struct hdfsConn *conn;
+
+  if (gHdfsAuthConf == AUTH_CONF_KERBEROS) {
+    // TODO: call some method which can tell us whether the FS exists.  In order
+    // to implement this, we have to add a method to FileSystem in order to do
+    // this without valid Kerberos authentication.  See HDFS-3674 for details.
+    return 0;
+  }
+  ret = fuseNewConnect("root", NULL, &conn);
+  if (ret) {
+    fprintf(stderr, "fuseConnectTest failed with error code %d\n", ret);
+    return ret;
+  }
+  hdfsConnRelease(conn);
   return 0;
 }
+
+struct hdfs_internal* hdfsConnGetFs(struct hdfsConn *conn)
+{
+  return conn->fs;
+}
+
+void hdfsConnRelease(struct hdfsConn *conn)
+{
+  pthread_mutex_lock(&gConnMutex);
+  conn->refcnt--;
+  if ((conn->refcnt == 0) && (conn->condemned)) {
+    fprintf(stderr, "hdfsConnRelease(usrname=%s): freeing condemend FS!\n",
+      conn->usrname);
+    /* Notice that we're not removing the connection from gConnTree here.
+     * If the connection is condemned, it must have already been removed from
+     * the tree, so that no other threads start using it.
+     */
+    hdfsConnFree(conn);
+  }
+  pthread_mutex_unlock(&gConnMutex);
+}
+
+/**
+ * Get the monotonic time.
+ *
+ * Unlike the wall-clock time, monotonic time only ever goes forward.  If the
+ * user adjusts the time, the monotonic time will not be affected.
+ *
+ * @return        The monotonic time
+ */
+static time_t getMonotonicTime(void)
+{
+  int res;
+  struct timespec ts;
+       
+  res = clock_gettime(CLOCK_MONOTONIC, &ts);
+  if (res)
+    abort();
+  return ts.tv_sec;
+}
+
+/**
+ * FUSE connection expiration thread
+ *
+ */
+static void* hdfsConnExpiryThread(void *v)
+{
+  time_t nextTime, curTime;
+  int waitTime;
+
+  nextTime = getMonotonicTime() + gTimerPeriod;
+  while (1) {
+    curTime = getMonotonicTime();
+    if (curTime >= nextTime) {
+      hdfsConnExpiry();
+      nextTime = curTime + gTimerPeriod;
+    }
+    waitTime = (nextTime - curTime) * 1000;
+    poll(NULL, 0, waitTime);
+  }
+  return NULL;
+}

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.h?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.h (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_connect.h Mon Jul 30 23:31:42 2012
@@ -19,10 +19,72 @@
 #ifndef __FUSE_CONNECT_H__
 #define __FUSE_CONNECT_H__
 
-#include "fuse_dfs.h"
+struct fuse_context;
+struct hdfsConn;
+struct hdfs_internal;
 
-hdfsFS doConnectAsUser(const char *nn_uri, int nn_port);
-int doDisconnect(hdfsFS fs);
-int allocFsTable(void);
+/**
+ * Initialize the fuse connection subsystem.
+ *
+ * This must be called before any of the other functions in this module.
+ *
+ * @param nnUri      The NameNode URI
+ * @param port       The NameNode port
+ *
+ * @return           0 on success; error code otherwise
+ */
+int fuseConnectInit(const char *nnUri, int port);
+
+/**
+ * Get a libhdfs connection.
+ *
+ * If there is an existing connection, it will be reused.  If not, a new one
+ * will be created.
+ *
+ * You must call hdfsConnRelease on the connection you get back!
+ *
+ * @param usrname    The username to use
+ * @param ctx        The FUSE context to use (contains UID, PID of requestor)
+ * @param conn       (out param) The HDFS connection
+ *
+ * @return           0 on success; error code otherwise
+ */
+int fuseConnect(const char *usrname, struct fuse_context *ctx,
+                struct hdfsConn **out);
+
+/**
+ * Get a libhdfs connection.
+ *
+ * The same as fuseConnect, except the username will be determined from the FUSE
+ * thread context.
+ *
+ * @param conn       (out param) The HDFS connection
+ *
+ * @return           0 on success; error code otherwise
+ */
+int fuseConnectAsThreadUid(struct hdfsConn **conn);
+
+/**
+ * Test whether we can connect to the HDFS cluster
+ *
+ * @return           0 on success; error code otherwise
+ */
+int fuseConnectTest(void);
+
+/**
+ * Get the hdfsFS associated with an hdfsConn.
+ *
+ * @param conn       The hdfsConn
+ *
+ * @return           the hdfsFS
+ */
+struct hdfs_internal* hdfsConnGetFs(struct hdfsConn *conn);
+
+/**
+ * Release an hdfsConn when we're done with it.
+ *
+ * @param conn       The hdfsConn
+ */
+void hdfsConnRelease(struct hdfsConn *conn);
 
 #endif

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_context_handle.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_context_handle.h?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_context_handle.h (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_context_handle.h Mon Jul 30 23:31:42 2012
@@ -31,8 +31,6 @@
 //
 typedef struct dfs_context_struct {
   int debug;
-  char *nn_uri;
-  int nn_port;
   int read_only;
   int usetrash;
   int direct_io;

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_dfs.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_dfs.c?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_dfs.c (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_dfs.c Mon Jul 30 23:31:42 2012
@@ -65,8 +65,19 @@ static struct fuse_operations dfs_oper =
   .truncate = dfs_truncate,
 };
 
+static void print_env_vars(void)
+{
+  const char *cp = getenv("CLASSPATH");
+  const char *ld = getenv("LD_LIBRARY_PATH");
+
+  fprintf(stderr, "LD_LIBRARY_PATH=%s",ld == NULL ? "NULL" : ld);
+  fprintf(stderr, "CLASSPATH=%s",cp == NULL ? "NULL" : cp);
+}
+
 int main(int argc, char *argv[])
 {
+  int ret;
+
   umask(0);
 
   extern const char *program;  
@@ -106,24 +117,22 @@ int main(int argc, char *argv[])
     exit(0);
   }
 
-  // Check connection as root
+  ret = fuseConnectInit(options.nn_uri, options.nn_port);
+  if (ret) {
+    ERROR("FATAL: dfs_init: fuseConnInit failed with error %d!", ret);
+    print_env_vars();
+    exit(EXIT_FAILURE);
+  }
   if (options.initchecks == 1) {
-    hdfsFS tempFS = hdfsConnectAsUser(options.nn_uri, options.nn_port, "root");
-    if (NULL == tempFS) {
-      const char *cp = getenv("CLASSPATH");
-      const char *ld = getenv("LD_LIBRARY_PATH");
-      ERROR("FATAL: misconfiguration - cannot connect to HDFS");
-      ERROR("LD_LIBRARY_PATH=%s",ld == NULL ? "NULL" : ld);
-      ERROR("CLASSPATH=%s",cp == NULL ? "NULL" : cp);
-      exit(1);
-    }
-    if (doDisconnect(tempFS)) {
-      ERROR("FATAL: unable to disconnect from test filesystem.");
-      exit(1);
+    ret = fuseConnectTest();
+    if (ret) {
+      ERROR("FATAL: dfs_init: fuseConnTest failed with error %d!", ret);
+      print_env_vars();
+      exit(EXIT_FAILURE);
     }
   }
 
-  int ret = fuse_main(args.argc, args.argv, &dfs_oper, NULL);
+  ret = fuse_main(args.argc, args.argv, &dfs_oper, NULL);
   fuse_opt_free_args(&args);
   return ret;
 }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_file_handle.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_file_handle.h?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_file_handle.h (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_file_handle.h Mon Jul 30 23:31:42 2012
@@ -22,6 +22,8 @@
 #include <hdfs.h>
 #include <pthread.h>
 
+struct hdfsConn;
+
 /**
  *
  * dfs_fh_struct is passed around for open files. Fuse provides a hook (the context) 
@@ -34,10 +36,10 @@
  */
 typedef struct dfs_fh_struct {
   hdfsFile hdfsFH;
+  struct hdfsConn *conn;
   char *buf;
   tSize bufferSize;  //what is the size of the buffer we have
   off_t buffersStartOffset; //where the buffer starts in the file
-  hdfsFS fs; // for reads/writes need to access as the real user
   pthread_mutex_t mutex;
 } dfs_fh;
 

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chmod.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chmod.c?rev=1367365&r1=1367364&r2=1367365&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chmod.c (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/fuse_impls_chmod.c Mon Jul 30 23:31:42 2012
@@ -23,6 +23,8 @@
 
 int dfs_chmod(const char *path, mode_t mode)
 {
+  struct hdfsConn *conn = NULL;
+  hdfsFS fs;
   TRACE1("chmod", path)
   int ret = 0;
   dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
@@ -31,22 +33,24 @@ int dfs_chmod(const char *path, mode_t m
   assert(dfs);
   assert('/' == *path);
 
-  hdfsFS userFS = doConnectAsUser(dfs->nn_uri, dfs->nn_port);
-  if (userFS == NULL) {
-    ERROR("Could not connect to HDFS");
+  ret = fuseConnectAsThreadUid(&conn);
+  if (ret) {
+    fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "
+            "connection!  error %d.\n", ret);
     ret = -EIO;
     goto cleanup;
   }
+  fs = hdfsConnGetFs(conn);
 
-  if (hdfsChmod(userFS, path, (short)mode)) {
+  if (hdfsChmod(fs, path, (short)mode)) {
     ERROR("Could not chmod %s to %d", path, (int)mode);
     ret = (errno > 0) ? -errno : -EIO;
     goto cleanup;
   }
 
 cleanup:
-  if (doDisconnect(userFS)) {
-    ret = -EIO;
+  if (conn) {
+    hdfsConnRelease(conn);
   }
 
   return ret;



Mime
View raw message