hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rang...@apache.org
Subject svn commit: r774433 [1/3] - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/security/ src/hdfs/ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/balancer/ src/hdfs/org/apache/hadoop/...
Date Wed, 13 May 2009 17:02:58 GMT
Author: rangadi
Date: Wed May 13 17:02:29 2009
New Revision: 774433

URL: http://svn.apache.org/viewvc?rev=774433&view=rev
Log:
HADOOP-4359. Support for data access authorization checking on Datanodes.
(Kan Zhang via rangadi)

Added:
    hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessKey.java
    hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessToken.java
    hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessTokenHandler.java
    hadoop/core/trunk/src/core/org/apache/hadoop/security/ExportedAccessKeys.java
    hadoop/core/trunk/src/core/org/apache/hadoop/security/InvalidAccessTokenException.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java.orig
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/KeyUpdateCommand.java
    hadoop/core/trunk/src/test/core/org/apache/hadoop/security/TestAccessToken.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/hdfs/hdfs-default.xml
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/JspHelper.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocol.java
    hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
    hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
    hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
    hadoop/core/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
    hadoop/core/trunk/src/webapps/datanode/browseBlock.jsp
    hadoop/core/trunk/src/webapps/datanode/tail.jsp

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed May 13 17:02:29 2009
@@ -53,6 +53,9 @@
     HADOOP-5679. Resolve findbugs warnings in core/streaming/pipes/examples. 
     (Jothi Padmanabhan via sharad)
 
+    HADOOP-4359. Support for data access authorization checking on Datanodes.
+    (Kan Zhang via rangadi)
+
   NEW FEATURES
 
     HADOOP-4268. Change fsck to use ClientProtocol methods so that the

Added: hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessKey.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessKey.java?rev=774433&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessKey.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessKey.java Wed May 13 17:02:29 2009
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import javax.crypto.Mac;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Key used for generating and verifying access tokens
+ */
+public class AccessKey implements Writable {
+  private long keyID;
+  private Text key;
+  private long expiryDate;
+  private Mac mac;
+
+  public AccessKey() {
+    this(0L, new Text(), 0L);
+  }
+
+  public AccessKey(long keyID, Text key, long expiryDate) {
+    this.keyID = keyID;
+    this.key = key;
+    this.expiryDate = expiryDate;
+  }
+
+  public long getKeyID() {
+    return keyID;
+  }
+
+  public Text getKey() {
+    return key;
+  }
+
+  public long getExpiryDate() {
+    return expiryDate;
+  }
+
+  public Mac getMac() {
+    return mac;
+  }
+
+  public void setMac(Mac mac) {
+    this.mac = mac;
+  }
+
+  static boolean isEqual(Object a, Object b) {
+    return a == null ? b == null : a.equals(b);
+  }
+
+  /** {@inheritDoc} */
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (obj instanceof AccessKey) {
+      AccessKey that = (AccessKey) obj;
+      return this.keyID == that.keyID && isEqual(this.key, that.key)
+          && this.expiryDate == that.expiryDate;
+    }
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  public int hashCode() {
+    return key == null ? 0 : key.hashCode();
+  }
+
+  // ///////////////////////////////////////////////
+  // Writable
+  // ///////////////////////////////////////////////
+  /**
+   */
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVLong(out, keyID);
+    key.write(out);
+    WritableUtils.writeVLong(out, expiryDate);
+  }
+
+  /**
+   */
+  public void readFields(DataInput in) throws IOException {
+    keyID = WritableUtils.readVLong(in);
+    key.readFields(in);
+    expiryDate = WritableUtils.readVLong(in);
+  }
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessToken.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessToken.java?rev=774433&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessToken.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessToken.java Wed May 13 17:02:29 2009
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+public class AccessToken implements Writable {
+  public static final AccessToken DUMMY_TOKEN = new AccessToken();
+  private Text tokenID;
+  private Text tokenAuthenticator;
+
+  public AccessToken() {
+    this(new Text(), new Text());
+  }
+
+  public AccessToken(Text tokenID, Text tokenAuthenticator) {
+    this.tokenID = tokenID;
+    this.tokenAuthenticator = tokenAuthenticator;
+  }
+
+  public Text getTokenID() {
+    return tokenID;
+  }
+
+  public Text getTokenAuthenticator() {
+    return tokenAuthenticator;
+  }
+
+  static boolean isEqual(Object a, Object b) {
+    return a == null ? b == null : a.equals(b);
+  }
+
+  /** {@inheritDoc} */
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (obj instanceof AccessToken) {
+      AccessToken that = (AccessToken) obj;
+      return isEqual(this.tokenID, that.tokenID)
+          && isEqual(this.tokenAuthenticator, that.tokenAuthenticator);
+    }
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  public int hashCode() {
+    return tokenAuthenticator == null ? 0 : tokenAuthenticator.hashCode();
+  }
+
+  // ///////////////////////////////////////////////
+  // Writable
+  // ///////////////////////////////////////////////
+  /**
+   */
+  public void write(DataOutput out) throws IOException {
+    tokenID.write(out);
+    tokenAuthenticator.write(out);
+  }
+
+  /**
+   */
+  public void readFields(DataInput in) throws IOException {
+    tokenID.readFields(in);
+    tokenAuthenticator.readFields(in);
+  }
+
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessTokenHandler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessTokenHandler.java?rev=774433&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessTokenHandler.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessTokenHandler.java Wed May 13 17:02:29 2009
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.security.GeneralSecurityException;
+import java.security.SecureRandom;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * AccessTokenHandler can be instantiated in 2 modes, master mode and slave
+ * mode. Master can generate new access keys and export access keys to slaves,
+ * while slaves can only import and use access keys received from master. Both
+ * master and slave can generate and verify access tokens. Typically, master
+ * mode is used by NN and slave mode is used by DN.
+ */
+public class AccessTokenHandler {
+  private static final Log LOG = LogFactory.getLog(AccessTokenHandler.class);
+  public static final String STRING_ENABLE_ACCESS_TOKEN = "dfs.access.token.enable";
+  public static final String STRING_ACCESS_KEY_UPDATE_INTERVAL = "dfs.access.key.update.interval";
+  public static final String STRING_ACCESS_TOKEN_LIFETIME = "dfs.access.token.lifetime";
+
+  private final boolean isMaster;
+  /*
+   * keyUpdateInterval is the interval that NN updates its access keys. It
+   * should be set long enough so that all live DN's and Balancer should have
+   * sync'ed their access keys with NN at least once during each interval.
+   */
+  private final long keyUpdateInterval;
+  private final long tokenLifetime;
+  private long serialNo = new SecureRandom().nextLong();
+  private KeyGenerator keyGen;
+  private AccessKey currentKey;
+  private AccessKey nextKey;
+  private Map<Long, AccessKey> allKeys;
+
+  public static enum AccessMode {
+    READ, WRITE, COPY, REPLACE
+  };
+
+  /**
+   * Constructor
+   * 
+   * @param isMaster
+   * @param keyUpdateInterval
+   * @param tokenLifetime
+   * @throws IOException
+   */
+  public AccessTokenHandler(boolean isMaster, long keyUpdateInterval,
+      long tokenLifetime) throws IOException {
+    this.isMaster = isMaster;
+    this.keyUpdateInterval = keyUpdateInterval;
+    this.tokenLifetime = tokenLifetime;
+    this.allKeys = new HashMap<Long, AccessKey>();
+    if (isMaster) {
+      try {
+        generateKeys();
+        initMac(currentKey);
+      } catch (GeneralSecurityException e) {
+        throw (IOException) new IOException(
+            "Failed to create AccessTokenHandler").initCause(e);
+      }
+    }
+  }
+
+  /** Initialize access keys */
+  private synchronized void generateKeys() throws NoSuchAlgorithmException {
+    keyGen = KeyGenerator.getInstance("HmacSHA1");
+    /*
+     * Need to set estimated expiry dates for currentKey and nextKey so that if
+     * NN crashes, DN can still expire those keys. NN will stop using the newly
+     * generated currentKey after the first keyUpdateInterval, however it may
+     * still be used by DN and Balancer to generate new tokens before they get a
+     * chance to sync their keys with NN. Since we require keyUpdInterval to be
+     * long enough so that all live DN's and Balancer will sync their keys with
+     * NN at least once during the period, the estimated expiry date for
+     * currentKey is set to now() + 2 * keyUpdateInterval + tokenLifetime.
+     * Similarly, the estimated expiry date for nextKey is one keyUpdateInterval
+     * more.
+     */
+    serialNo++;
+    currentKey = new AccessKey(serialNo, new Text(keyGen.generateKey()
+        .getEncoded()), System.currentTimeMillis() + 2 * keyUpdateInterval
+        + tokenLifetime);
+    serialNo++;
+    nextKey = new AccessKey(serialNo, new Text(keyGen.generateKey()
+        .getEncoded()), System.currentTimeMillis() + 3 * keyUpdateInterval
+        + tokenLifetime);
+    allKeys.put(currentKey.getKeyID(), currentKey);
+    allKeys.put(nextKey.getKeyID(), nextKey);
+  }
+
+  /** Initialize Mac function */
+  private synchronized void initMac(AccessKey key) throws IOException {
+    try {
+      Mac mac = Mac.getInstance("HmacSHA1");
+      mac.init(new SecretKeySpec(key.getKey().getBytes(), "HmacSHA1"));
+      key.setMac(mac);
+    } catch (GeneralSecurityException e) {
+      throw (IOException) new IOException(
+          "Failed to initialize Mac for access key, keyID=" + key.getKeyID())
+          .initCause(e);
+    }
+  }
+
+  /** Export access keys, only to be used in master mode */
+  public synchronized ExportedAccessKeys exportKeys() {
+    if (!isMaster)
+      return null;
+    if (LOG.isDebugEnabled())
+      LOG.debug("Exporting access keys");
+    return new ExportedAccessKeys(true, keyUpdateInterval, tokenLifetime,
+        currentKey, allKeys.values().toArray(new AccessKey[0]));
+  }
+
+  private synchronized void removeExpiredKeys() {
+    long now = System.currentTimeMillis();
+    for (Iterator<Map.Entry<Long, AccessKey>> it = allKeys.entrySet()
+        .iterator(); it.hasNext();) {
+      Map.Entry<Long, AccessKey> e = it.next();
+      if (e.getValue().getExpiryDate() < now) {
+        it.remove();
+      }
+    }
+  }
+
+  /**
+   * Set access keys, only to be used in slave mode
+   */
+  public synchronized void setKeys(ExportedAccessKeys exportedKeys)
+      throws IOException {
+    if (isMaster || exportedKeys == null)
+      return;
+    LOG.info("Setting access keys");
+    removeExpiredKeys();
+    this.currentKey = exportedKeys.getCurrentKey();
+    initMac(currentKey);
+    AccessKey[] receivedKeys = exportedKeys.getAllKeys();
+    for (int i = 0; i < receivedKeys.length; i++) {
+      if (receivedKeys[i] == null)
+        continue;
+      this.allKeys.put(receivedKeys[i].getKeyID(), receivedKeys[i]);
+    }
+  }
+
+  /**
+   * Update access keys, only to be used in master mode
+   */
+  public synchronized void updateKeys() throws IOException {
+    if (!isMaster)
+      return;
+    LOG.info("Updating access keys");
+    removeExpiredKeys();
+    // set final expiry date of retiring currentKey
+    allKeys.put(currentKey.getKeyID(), new AccessKey(currentKey.getKeyID(),
+        currentKey.getKey(), System.currentTimeMillis() + keyUpdateInterval
+            + tokenLifetime));
+    // update the estimated expiry date of new currentKey
+    currentKey = new AccessKey(nextKey.getKeyID(), nextKey.getKey(), System
+        .currentTimeMillis()
+        + 2 * keyUpdateInterval + tokenLifetime);
+    initMac(currentKey);
+    allKeys.put(currentKey.getKeyID(), currentKey);
+    // generate a new nextKey
+    serialNo++;
+    nextKey = new AccessKey(serialNo, new Text(keyGen.generateKey()
+        .getEncoded()), System.currentTimeMillis() + 3 * keyUpdateInterval
+        + tokenLifetime);
+    allKeys.put(nextKey.getKeyID(), nextKey);
+  }
+
+  /** Check if token is well formed */
+  private synchronized Boolean verifyToken(long keyID, AccessToken token)
+      throws IOException {
+    AccessKey key = allKeys.get(keyID);
+    if (key == null) {
+      LOG.warn("Access key for keyID=" + keyID + " doesn't exist.");
+      return false;
+    }
+    if (key.getMac() == null) {
+      initMac(key);
+    }
+    Text tokenID = token.getTokenID();
+    Text authenticator = new Text(key.getMac().doFinal(tokenID.getBytes()));
+    return authenticator.equals(token.getTokenAuthenticator());
+  }
+
+  /** Generate an access token for current user */
+  public AccessToken generateToken(long blockID, EnumSet<AccessMode> modes)
+      throws IOException {
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
+    String userID = (ugi == null ? null : ugi.getUserName());
+    return generateToken(userID, blockID, modes);
+  }
+
+  /** Generate an access token for a specified user */
+  public synchronized AccessToken generateToken(String userID, long blockID,
+      EnumSet<AccessMode> modes) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Generating access token for user=" + userID + ", blockID="
+          + blockID + ", access modes=" + modes + ", keyID="
+          + currentKey.getKeyID());
+    }
+    if (modes == null || modes.isEmpty())
+      throw new IOException("access modes can't be null or empty");
+    ByteArrayOutputStream buf = new ByteArrayOutputStream(4096);
+    DataOutputStream out = new DataOutputStream(buf);
+    WritableUtils.writeVLong(out, System.currentTimeMillis() + tokenLifetime);
+    WritableUtils.writeVLong(out, currentKey.getKeyID());
+    WritableUtils.writeString(out, userID);
+    WritableUtils.writeVLong(out, blockID);
+    WritableUtils.writeVInt(out, modes.size());
+    for (AccessMode aMode : modes) {
+      WritableUtils.writeEnum(out, aMode);
+    }
+    Text tokenID = new Text(buf.toByteArray());
+    return new AccessToken(tokenID, new Text(currentKey.getMac().doFinal(
+        tokenID.getBytes())));
+  }
+
+  /** Check if access should be allowed. userID is not checked if null */
+  public Boolean checkAccess(AccessToken token, String userID, long blockID,
+      AccessMode mode) throws IOException {
+    long oExpiry = 0;
+    long oKeyID = 0;
+    String oUserID = null;
+    long oBlockID = 0;
+    EnumSet<AccessMode> oModes = EnumSet.noneOf(AccessMode.class);
+
+    try {
+      ByteArrayInputStream buf = new ByteArrayInputStream(token.getTokenID()
+          .getBytes());
+      DataInputStream in = new DataInputStream(buf);
+      oExpiry = WritableUtils.readVLong(in);
+      oKeyID = WritableUtils.readVLong(in);
+      oUserID = WritableUtils.readString(in);
+      oBlockID = WritableUtils.readVLong(in);
+      int length = WritableUtils.readVInt(in);
+      for (int i = 0; i < length; ++i) {
+        oModes.add(WritableUtils.readEnum(in, AccessMode.class));
+      }
+    } catch (IOException e) {
+      throw (IOException) new IOException(
+          "Unable to parse access token for user=" + userID + ", blockID="
+              + blockID + ", access mode=" + mode).initCause(e);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Verifying access token for user=" + userID + ", blockID="
+          + blockID + ", access mode=" + mode + ", keyID=" + oKeyID);
+    }
+    return (userID == null || userID.equals(oUserID)) && oBlockID == blockID
+        && System.currentTimeMillis() < oExpiry && oModes.contains(mode)
+        && verifyToken(oKeyID, token);
+  }
+
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/core/org/apache/hadoop/security/ExportedAccessKeys.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/security/ExportedAccessKeys.java?rev=774433&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/security/ExportedAccessKeys.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/security/ExportedAccessKeys.java Wed May 13 17:02:29 2009
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * Object for passing access keys
+ */
+public class ExportedAccessKeys implements Writable {
+  public static final ExportedAccessKeys DUMMY_KEYS = new ExportedAccessKeys();
+  private boolean isAccessTokenEnabled;
+  private long keyUpdateInterval;
+  private long tokenLifetime;
+  private AccessKey currentKey;
+  private AccessKey[] allKeys;
+
+  public ExportedAccessKeys() {
+    this(false, 0, 0, new AccessKey(), new AccessKey[0]);
+  }
+
+  ExportedAccessKeys(boolean isAccessTokenEnabled, long keyUpdateInterval,
+      long tokenLifetime, AccessKey currentKey, AccessKey[] allKeys) {
+    this.isAccessTokenEnabled = isAccessTokenEnabled;
+    this.keyUpdateInterval = keyUpdateInterval;
+    this.tokenLifetime = tokenLifetime;
+    this.currentKey = currentKey;
+    this.allKeys = allKeys;
+  }
+
+  public boolean isAccessTokenEnabled() {
+    return isAccessTokenEnabled;
+  }
+
+  public long getKeyUpdateInterval() {
+    return keyUpdateInterval;
+  }
+
+  public long getTokenLifetime() {
+    return tokenLifetime;
+  }
+
+  public AccessKey getCurrentKey() {
+    return currentKey;
+  }
+
+  public AccessKey[] getAllKeys() {
+    return allKeys;
+  }
+
+  static boolean isEqual(Object a, Object b) {
+    return a == null ? b == null : a.equals(b);
+  }
+
+  /** {@inheritDoc} */
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (obj instanceof ExportedAccessKeys) {
+      ExportedAccessKeys that = (ExportedAccessKeys) obj;
+      return this.isAccessTokenEnabled == that.isAccessTokenEnabled
+          && this.keyUpdateInterval == that.keyUpdateInterval
+          && this.tokenLifetime == that.tokenLifetime
+          && isEqual(this.currentKey, that.currentKey)
+          && Arrays.equals(this.allKeys, that.allKeys);
+    }
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  public int hashCode() {
+    return currentKey == null ? 0 : currentKey.hashCode();
+  }
+
+  // ///////////////////////////////////////////////
+  // Writable
+  // ///////////////////////////////////////////////
+  static { // register a ctor
+    WritableFactories.setFactory(ExportedAccessKeys.class,
+        new WritableFactory() {
+          public Writable newInstance() {
+            return new ExportedAccessKeys();
+          }
+        });
+  }
+
+  /**
+   */
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(isAccessTokenEnabled);
+    out.writeLong(keyUpdateInterval);
+    out.writeLong(tokenLifetime);
+    currentKey.write(out);
+    out.writeInt(allKeys.length);
+    for (int i = 0; i < allKeys.length; i++) {
+      allKeys[i].write(out);
+    }
+  }
+
+  /**
+   */
+  public void readFields(DataInput in) throws IOException {
+    isAccessTokenEnabled = in.readBoolean();
+    keyUpdateInterval = in.readLong();
+    tokenLifetime = in.readLong();
+    currentKey.readFields(in);
+    this.allKeys = new AccessKey[in.readInt()];
+    for (int i = 0; i < allKeys.length; i++) {
+      allKeys[i] = new AccessKey();
+      allKeys[i].readFields(in);
+    }
+  }
+
+}
\ No newline at end of file

Added: hadoop/core/trunk/src/core/org/apache/hadoop/security/InvalidAccessTokenException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/security/InvalidAccessTokenException.java?rev=774433&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/security/InvalidAccessTokenException.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/security/InvalidAccessTokenException.java Wed May 13 17:02:29 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security;
+
+import java.io.IOException;
+
+/**
+ * Access token verification failed.
+ */
+public class InvalidAccessTokenException extends IOException {
+  private static final long serialVersionUID = 168L;
+
+  public InvalidAccessTokenException() {
+    super();
+  }
+
+  public InvalidAccessTokenException(String msg) {
+    super(msg);
+  }
+}

Modified: hadoop/core/trunk/src/hdfs/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/hdfs-default.xml?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/hdfs-default.xml (original)
+++ hadoop/core/trunk/src/hdfs/hdfs-default.xml Wed May 13 17:02:29 2009
@@ -206,6 +206,29 @@
 </property>
 
 <property>
+  <name>dfs.access.token.enable</name>
+  <value>false</value>
+  <description>
+    If "true", access tokens are used as capabilities for accessing datanodes.
+    If "false", no access tokens are checked on accessing datanodes.
+  </description>
+</property>
+
+<property>
+  <name>dfs.access.key.update.interval</name>
+  <value>600</value>
+  <description>
+    Interval in minutes at which namenode updates its access keys.
+  </description>
+</property>
+
+<property>
+  <name>dfs.access.token.lifetime</name>
+  <value>600</value>
+  <description>The lifetime of access tokens in minutes.</description>
+</property>
+
+<property>
   <name>dfs.data.dir</name>
   <value>${hadoop.tmp.dir}/dfs/data</value>
   <description>Determines where on the local filesystem an DFS data node

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Wed May 13 17:02:29 2009
@@ -33,7 +33,9 @@
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.security.InvalidAccessTokenException;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.AccessToken;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.*;
 
@@ -566,14 +568,21 @@
       ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout
       ) throws IOException {
     //get all block locations
-    final List<LocatedBlock> locatedblocks
+    List<LocatedBlock> locatedblocks
         = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE).getLocatedBlocks();
     final DataOutputBuffer md5out = new DataOutputBuffer();
     int bytesPerCRC = 0;
     long crcPerBlock = 0;
+    boolean refetchBlocks = false;
+    int lastRetriedIndex = -1;
 
     //get block checksum for each block
     for(int i = 0; i < locatedblocks.size(); i++) {
+      if (refetchBlocks) {  // refetch to get fresh tokens
+        locatedblocks = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE)
+            .getLocatedBlocks();
+        refetchBlocks = false;
+      }
       LocatedBlock lb = locatedblocks.get(i);
       final Block block = lb.getBlock();
       final DatanodeInfo[] datanodes = lb.getLocations();
@@ -605,12 +614,28 @@
           out.write(DataTransferProtocol.OP_BLOCK_CHECKSUM);
           out.writeLong(block.getBlockId());
           out.writeLong(block.getGenerationStamp());
+          lb.getAccessToken().write(out);
           out.flush();
          
           final short reply = in.readShort();
           if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
-            throw new IOException("Bad response " + reply + " for block "
-                + block + " from datanode " + datanodes[j].getName());
+            if (reply == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN
+                && i > lastRetriedIndex) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
+                    + "for file " + src + " for block " + block
+                    + " from datanode " + datanodes[j].getName()
+                    + ". Will retry the block once.");
+              }
+              lastRetriedIndex = i;
+              done = true; // actually it's not done; but we'll retry
+              i--; // repeat at i-th block
+              refetchBlocks = true;
+              break;
+            } else {
+              throw new IOException("Bad response " + reply + " for block "
+                  + block + " from datanode " + datanodes[j].getName());
+            }
           }
 
           //read byte-per-checksum
@@ -1248,24 +1273,26 @@
       checksumSize = this.checksum.getChecksumSize();
     }
 
-    public static BlockReader newBlockReader(Socket sock, String file, long blockId, 
+    public static BlockReader newBlockReader(Socket sock, String file, long blockId, AccessToken accessToken, 
         long genStamp, long startOffset, long len, int bufferSize) throws IOException {
-      return newBlockReader(sock, file, blockId, genStamp, startOffset, len, bufferSize,
+      return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset, len, bufferSize,
           true);
     }
 
     /** Java Doc required */
     public static BlockReader newBlockReader( Socket sock, String file, long blockId, 
+                                       AccessToken accessToken,
                                        long genStamp,
                                        long startOffset, long len,
                                        int bufferSize, boolean verifyChecksum)
                                        throws IOException {
-      return newBlockReader(sock, file, blockId, genStamp, startOffset,
+      return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset,
                             len, bufferSize, verifyChecksum, "");
     }
 
     public static BlockReader newBlockReader( Socket sock, String file,
                                        long blockId, 
+                                       AccessToken accessToken,
                                        long genStamp,
                                        long startOffset, long len,
                                        int bufferSize, boolean verifyChecksum,
@@ -1283,6 +1310,7 @@
       out.writeLong( startOffset );
       out.writeLong( len );
       Text.writeString(out, clientName);
+      accessToken.write(out);
       out.flush();
       
       //
@@ -1293,10 +1321,16 @@
           new BufferedInputStream(NetUtils.getInputStream(sock), 
                                   bufferSize));
       
-      if ( in.readShort() != DataTransferProtocol.OP_STATUS_SUCCESS ) {
-        throw new IOException("Got error in response to OP_READ_BLOCK " +
-                              "for file " + file + 
-                              " for block " + blockId);
+      short status = in.readShort();
+      if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
+        if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
+          throw new InvalidAccessTokenException(
+              "Got access token error in response to OP_READ_BLOCK "
+                  + "for file " + file + " for block " + blockId);
+        } else {
+          throw new IOException("Got error in response to OP_READ_BLOCK "
+              + "for file " + file + " for block " + blockId);
+        }
       }
       DataChecksum checksum = DataChecksum.newDataChecksum( in );
       //Warning when we get CHECKSUM_NULL?
@@ -1443,7 +1477,7 @@
      * @return located block
      * @throws IOException
      */
-    private LocatedBlock getBlockAt(long offset) throws IOException {
+    private synchronized LocatedBlock getBlockAt(long offset) throws IOException {
       assert (locatedBlocks != null) : "locatedBlocks is null";
       // search cached blocks first
       int targetBlockIdx = locatedBlocks.findBlock(offset);
@@ -1463,6 +1497,32 @@
       return blk;
     }
 
+    /** Fetch a block from namenode and cache it */
+    private synchronized void fetchAndCacheBlockAt(long offset) throws IOException {
+      int targetBlockIdx = locatedBlocks.findBlock(offset);
+      if (targetBlockIdx < 0) { // block is not cached
+        targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
+      }
+      // fetch blocks
+      LocatedBlocks newBlocks;
+      newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
+      if (newBlocks == null) {
+        throw new IOException("Could not find target position " + offset);
+      }
+      locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+    }
+
+    /** Fetch a block without caching */
+    private LocatedBlock fetchBlockAt(long offset) throws IOException {
+      LocatedBlocks newBlocks;
+      newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
+      if (newBlocks == null) {
+        throw new IOException("Could not find target position " + offset);
+      }
+      int index = newBlocks.findBlock(offset);
+      return newBlocks.get(index);
+    }
+    
     /**
      * Get blocks in the specified range.
      * Fetch them from the namenode if not cached.
@@ -1524,17 +1584,18 @@
       }
 
       //
-      // Compute desired block
-      //
-      LocatedBlock targetBlock = getBlockAt(target);
-      assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
-      long offsetIntoBlock = target - targetBlock.getStartOffset();
-
-      //
       // Connect to best DataNode for desired Block, with potential offset
       //
       DatanodeInfo chosenNode = null;
-      while (s == null) {
+      int refetchToken = 1; // only need to get a new access token once
+      while (true) {
+        //
+        // Compute desired block
+        //
+        LocatedBlock targetBlock = getBlockAt(target);
+        assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
+        long offsetIntoBlock = target - targetBlock.getStartOffset();
+
         DNAddrPair retval = chooseDataNode(targetBlock);
         chosenNode = retval.info;
         InetSocketAddress targetAddr = retval.addr;
@@ -1544,17 +1605,33 @@
           NetUtils.connect(s, targetAddr, socketTimeout);
           s.setSoTimeout(socketTimeout);
           Block blk = targetBlock.getBlock();
+          AccessToken accessToken = targetBlock.getAccessToken();
           
           blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), 
+              accessToken, 
               blk.getGenerationStamp(),
               offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
               buffersize, verifyChecksum, clientName);
           return chosenNode;
         } catch (IOException ex) {
-          // Put chosen node into dead list, continue
           LOG.debug("Failed to connect to " + targetAddr + ":" 
                     + StringUtils.stringifyException(ex));
-          addToDeadNodes(chosenNode);
+          if (ex instanceof InvalidAccessTokenException && refetchToken-- > 0) {
+            /*
+             * Get a new access token and retry. Retry is needed in 2 cases. 1)
+             * When both NN and DN re-started while DFSClient holding a cached
+             * access token. 2) In the case that NN fails to update its
+             * access key at pre-set interval (by a wide margin) and
+             * subsequently restarts. In this case, DN re-registers itself with
+             * NN and receives a new access key, but DN will delete the old
+             * access key from its memory since it's considered expired based on
+             * the estimated expiration date.
+             */
+            fetchAndCacheBlockAt(target);
+          } else {
+            // Put chosen node into dead list, continue
+            addToDeadNodes(chosenNode);
+          }
           if (s != null) {
             try {
               s.close();
@@ -1564,7 +1641,6 @@
           s = null;
         }
       }
-      return chosenNode;
     }
 
     /**
@@ -1734,6 +1810,7 @@
       Socket dn = null;
       int numAttempts = block.getLocations().length;
       IOException ioe = null;
+      int refetchToken = 1; // only need to get a new access token once
       
       while (dn == null && numAttempts-- > 0 ) {
         DNAddrPair retval = chooseDataNode(block);
@@ -1745,11 +1822,13 @@
           dn = socketFactory.createSocket();
           NetUtils.connect(dn, targetAddr, socketTimeout);
           dn.setSoTimeout(socketTimeout);
+          AccessToken accessToken = block.getAccessToken();
               
           int len = (int) (end - start + 1);
               
           reader = BlockReader.newBlockReader(dn, src, 
                                               block.getBlock().getBlockId(),
+                                              accessToken,
                                               block.getBlock().getGenerationStamp(),
                                               start, len, buffersize, 
                                               verifyChecksum, clientName);
@@ -1767,10 +1846,20 @@
           reportChecksumFailure(src, block.getBlock(), chosenNode);
         } catch (IOException e) {
           ioe = e;
-          LOG.warn("Failed to connect to " + targetAddr + 
-                   " for file " + src + 
-                   " for block " + block.getBlock().getBlockId() + ":"  +
-                   StringUtils.stringifyException(e));
+          if (e instanceof InvalidAccessTokenException && refetchToken-- > 0) {
+            LOG.info("Invalid access token when connecting to " + targetAddr
+                + " for file " + src + " for block "
+                + block.getBlock() + ":"
+                + StringUtils.stringifyException(e)
+                + ", get a new access token and retry...");
+            block = fetchBlockAt(block.getStartOffset());
+            numAttempts = block.getLocations().length;
+            continue;
+          } else {
+            LOG.warn("Failed to connect to " + targetAddr + " for file " + src
+                + " for block " + block.getBlock() + ":"
+                + StringUtils.stringifyException(e));
+          }
         } finally {
           IOUtils.closeStream(reader);
           IOUtils.closeSocket(dn);
@@ -2004,6 +2093,7 @@
     private DataOutputStream blockStream;
     private DataInputStream blockReplyStream;
     private Block block;
+    private AccessToken accessToken;
     final private long blockSize;
     private DataChecksum checksum;
     private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
@@ -2523,6 +2613,7 @@
         //
         if (newBlock != null) {
           block = newBlock.getBlock();
+          accessToken = newBlock.getAccessToken();
           nodes = newBlock.getLocations();
         }
 
@@ -2707,6 +2798,7 @@
         long startTime = System.currentTimeMillis();
         lb = locateFollowingBlock(startTime);
         block = lb.getBlock();
+        accessToken = lb.getAccessToken();
         nodes = lb.getLocations();
   
         //
@@ -2785,6 +2877,7 @@
         for (int i = 1; i < nodes.length; i++) {
           nodes[i].write(out);
         }
+        accessToken.write(out);
         checksum.writeHeader( out );
         out.flush();
 

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Wed May 13 17:02:29 2009
@@ -41,9 +41,9 @@
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 43: added restoreFailedStorage command
+   * 44: All LocatedBlock objects contain access tokens
    */
-  public static final long versionID = 43L;
+  public static final long versionID = 44L;
   
   ///////////////////////////////////////
   // File contents

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Wed May 13 17:02:29 2009
@@ -31,15 +31,11 @@
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 14:
-   *    OP_REPLACE_BLOCK is sent from the Balancer server to the destination,
-   *    including the block id, source, and proxy.
-   *    OP_COPY_BLOCK is sent from the destination to the proxy, which contains
-   *    only the block id.
-   *    A reply to OP_COPY_BLOCK sends the block content.
-   *    A reply to OP_REPLACE_BLOCK includes an operation status.
+   * Version 15:
+   *    Added a new status OP_STATUS_ERROR_ACCESS_TOKEN
+   *    Access token is now required on all DN operations
    */
-  public static final int DATA_TRANSFER_VERSION = 14;
+  public static final int DATA_TRANSFER_VERSION = 15;
 
   // Processed at datanode stream-handler
   public static final byte OP_WRITE_BLOCK = (byte) 80;
@@ -54,7 +50,8 @@
   public static final int OP_STATUS_ERROR_CHECKSUM = 2;  
   public static final int OP_STATUS_ERROR_INVALID = 3;  
   public static final int OP_STATUS_ERROR_EXISTS = 4;  
-  public static final int OP_STATUS_CHECKSUM_OK = 5;  
+  public static final int OP_STATUS_ERROR_ACCESS_TOKEN = 5;
+  public static final int OP_STATUS_CHECKSUM_OK = 6;
 
 
 

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Wed May 13 17:02:29 2009
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.protocol;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.security.AccessToken;
 
 import java.io.*;
 
@@ -43,6 +44,7 @@
   // else false. If block has few corrupt replicas, they are filtered and 
   // their locations are not part of this object
   private boolean corrupt;
+  private AccessToken accessToken = new AccessToken();
 
   /**
    */
@@ -76,6 +78,14 @@
     }
   }
 
+  public AccessToken getAccessToken() {
+    return accessToken;
+  }
+
+  public void setAccessToken(AccessToken token) {
+    this.accessToken = token;
+  }
+
   /**
    */
   public Block getBlock() {
@@ -112,6 +122,7 @@
   // Writable
   ///////////////////////////////////////////
   public void write(DataOutput out) throws IOException {
+    accessToken.write(out);
     out.writeBoolean(corrupt);
     out.writeLong(offset);
     b.write(out);
@@ -122,6 +133,7 @@
   }
 
   public void readFields(DataInput in) throws IOException {
+    accessToken.readFields(in);
     this.corrupt = in.readBoolean();
     offset = in.readLong();
     this.b = new Block();

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java Wed May 13 17:02:29 2009
@@ -33,6 +33,7 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
+import java.util.EnumSet;
 import java.util.Formatter;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -75,8 +76,12 @@
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.AccessToken;
+import org.apache.hadoop.security.AccessTokenHandler;
+import org.apache.hadoop.security.ExportedAccessKeys;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -194,6 +199,11 @@
   private NamenodeProtocol namenode;
   private ClientProtocol client;
   private FileSystem fs;
+  private boolean isAccessTokenEnabled;
+  private boolean shouldRun;
+  private long keyUpdaterInterval;
+  private AccessTokenHandler accessTokenHandler;
+  private Daemon keyupdaterthread = null; // AccessKeyUpdater thread
   private final static Random rnd = new Random();
   
   // all data node lists
@@ -363,6 +373,13 @@
       out.writeLong(block.getBlock().getGenerationStamp());
       Text.writeString(out, source.getStorageID());
       proxySource.write(out);
+      AccessToken accessToken = AccessToken.DUMMY_TOKEN;
+      if (isAccessTokenEnabled) {
+        accessToken = accessTokenHandler.generateToken(null, block.getBlock()
+            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.REPLACE,
+            AccessTokenHandler.AccessMode.COPY));
+      }
+      accessToken.write(out);
       out.flush();
     }
     
@@ -370,6 +387,8 @@
     private void receiveResponse(DataInputStream in) throws IOException {
       short status = in.readShort();
       if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
+        if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN)
+          throw new IOException("block move failed due to access token error");
         throw new IOException("block move is failed");
       }
     }
@@ -845,6 +864,48 @@
     this.namenode = createNamenode(conf);
     this.client = DFSClient.createNamenode(conf);
     this.fs = FileSystem.get(conf);
+    ExportedAccessKeys keys = namenode.getAccessKeys();
+    this.isAccessTokenEnabled = keys.isAccessTokenEnabled();
+    if (isAccessTokenEnabled) {
+      long accessKeyUpdateInterval = keys.getKeyUpdateInterval();
+      long accessTokenLifetime = keys.getTokenLifetime();
+      LOG.info("Access token params received from NN: keyUpdateInterval="
+          + accessKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
+          + accessTokenLifetime / (60 * 1000) + " min(s)");
+      this.accessTokenHandler = new AccessTokenHandler(false,
+          accessKeyUpdateInterval, accessTokenLifetime);
+      this.accessTokenHandler.setKeys(keys);
+      /*
+       * Balancer should sync its access keys with NN more frequently than NN
+       * updates its access keys
+       */
+      this.keyUpdaterInterval = accessKeyUpdateInterval / 4;
+      LOG.info("Balancer will update its access keys every "
+          + keyUpdaterInterval / (60 * 1000) + " minute(s)");
+      this.keyupdaterthread = new Daemon(new AccessKeyUpdater());
+      this.shouldRun = true;
+      this.keyupdaterthread.start();
+    }
+  }
+  
+  /**
+   * Periodically updates access keys.
+   */
+  class AccessKeyUpdater implements Runnable {
+
+    public void run() {
+      while (shouldRun) {
+        try {
+          accessTokenHandler.setKeys(namenode.getAccessKeys());
+        } catch (Exception e) {
+          LOG.error(StringUtils.stringifyException(e));
+        }
+        try {
+          Thread.sleep(keyUpdaterInterval);
+        } catch (InterruptedException ie) {
+        }
+      }
+    }
   }
   
   /* Build a NamenodeProtocol connection to the namenode and
@@ -861,6 +922,7 @@
     Map<String,RetryPolicy> methodNameToPolicyMap =
         new HashMap<String, RetryPolicy>();
     methodNameToPolicyMap.put("getBlocks", methodPolicy);
+    methodNameToPolicyMap.put("getAccessKeys", methodPolicy);
 
     UserGroupInformation ugi;
     try {
@@ -1506,6 +1568,12 @@
       dispatcherExecutor.shutdownNow();
       moverExecutor.shutdownNow();
 
+      shouldRun = false;
+      try {
+        if (keyupdaterthread != null) keyupdaterthread.interrupt();
+      } catch (Exception e) {
+        LOG.warn("Exception shutting down access key updater thread", e);
+      }
       // close the output file
       IOUtils.closeStream(out); 
       if (fs != null) {

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed May 13 17:02:29 2009
@@ -34,6 +34,7 @@
 import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -71,6 +72,7 @@
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
@@ -82,6 +84,9 @@
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessToken;
+import org.apache.hadoop.security.AccessTokenHandler;
+import org.apache.hadoop.security.ExportedAccessKeys;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.authorize.ConfiguredPolicy;
 import org.apache.hadoop.security.authorize.PolicyProvider;
@@ -190,6 +195,9 @@
   int socketWriteTimeout = 0;  
   boolean transferToAllowed = true;
   int writePacketSize = 0;
+  boolean isAccessTokenEnabled;
+  AccessTokenHandler accessTokenHandler;
+  boolean isAccessTokenInitialized = false;
   
   public DataBlockScanner blockScanner = null;
   public Daemon blockScannerThread = null;
@@ -562,6 +570,27 @@
           + ". Expecting " + storage.getStorageID());
     }
     
+    if (!isAccessTokenInitialized) {
+      /* first time registering with NN */
+      ExportedAccessKeys keys = dnRegistration.exportedKeys;
+      this.isAccessTokenEnabled = keys.isAccessTokenEnabled();
+      if (isAccessTokenEnabled) {
+        long accessKeyUpdateInterval = keys.getKeyUpdateInterval();
+        long accessTokenLifetime = keys.getTokenLifetime();
+        LOG.info("Access token params received from NN: keyUpdateInterval="
+            + accessKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
+            + accessTokenLifetime / (60 * 1000) + " min(s)");
+        this.accessTokenHandler = new AccessTokenHandler(false,
+            accessKeyUpdateInterval, accessTokenLifetime);
+      }
+      isAccessTokenInitialized = true;
+    }
+
+    if (isAccessTokenEnabled) {
+      accessTokenHandler.setKeys(dnRegistration.exportedKeys);
+      dnRegistration.exportedKeys = ExportedAccessKeys.DUMMY_KEYS;
+    }
+
     // random short delay - helps scatter the BR from all DNs
     scheduleBlockReport(initialBlockReportDelay);
   }
@@ -849,6 +878,12 @@
     case DatanodeProtocol.DNA_RECOVERBLOCK:
       recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());
       break;
+    case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
+      LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
+      if (isAccessTokenEnabled) {
+        accessTokenHandler.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());
+      }
+      break;
     default:
       LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
     }
@@ -1187,6 +1222,12 @@
         for (int i = 1; i < targets.length; i++) {
           targets[i].write(out);
         }
+        AccessToken accessToken = AccessToken.DUMMY_TOKEN;
+        if (isAccessTokenEnabled) {
+          accessToken = accessTokenHandler.generateToken(null, b.getBlockId(),
+              EnumSet.of(AccessTokenHandler.AccessMode.WRITE));
+        }
+        accessToken.write(out);
         // send data & checksum
         blockSender.sendBlock(out, baseStream, null);
 

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed May 13 17:02:29 2009
@@ -38,6 +38,8 @@
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessToken;
+import org.apache.hadoop.security.AccessTokenHandler;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
 import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
@@ -153,12 +155,26 @@
     long startOffset = in.readLong();
     long length = in.readLong();
     String clientName = Text.readString(in);
-    // send the block
+    AccessToken accessToken = new AccessToken();
+    accessToken.readFields(in);
     OutputStream baseStream = NetUtils.getOutputStream(s, 
         datanode.socketWriteTimeout);
     DataOutputStream out = new DataOutputStream(
                  new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
     
+    if (datanode.isAccessTokenEnabled
+        && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
+            AccessTokenHandler.AccessMode.READ)) {
+      try {
+        out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
+        out.flush();
+        throw new IOException("Access token verification failed, on client "
+            + "request for reading block " + block);
+      } finally {
+        IOUtils.closeStream(out);
+      }
+    }
+    // send the block
     BlockSender blockSender = null;
     final String clientTraceFmt =
       clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
@@ -246,10 +262,28 @@
       tmp.readFields(in);
       targets[i] = tmp;
     }
+    AccessToken accessToken = new AccessToken();
+    accessToken.readFields(in);
+    DataOutputStream replyOut = null;   // stream to prev target
+    replyOut = new DataOutputStream(
+                   NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+    if (datanode.isAccessTokenEnabled
+        && !datanode.accessTokenHandler.checkAccess(accessToken, null, block
+            .getBlockId(), AccessTokenHandler.AccessMode.WRITE)) {
+      try {
+        if (client.length() != 0) {
+          Text.writeString(replyOut, datanode.dnRegistration.getName());
+          replyOut.flush();
+        }
+        throw new IOException("Access token verification failed, on client "
+            + "request for writing block " + block);
+      } finally {
+        IOUtils.closeStream(replyOut);
+      }
+    }
 
     DataOutputStream mirrorOut = null;  // stream to next target
     DataInputStream mirrorIn = null;    // reply from next target
-    DataOutputStream replyOut = null;   // stream to prev target
     Socket mirrorSock = null;           // socket to next target
     BlockReceiver blockReceiver = null; // responsible for data handling
     String mirrorNode = null;           // the name:port of next target
@@ -261,10 +295,6 @@
           s.getLocalSocketAddress().toString(),
           isRecovery, client, srcDataNode, datanode);
 
-      // get a connection back to the previous target
-      replyOut = new DataOutputStream(
-                     NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
-
       //
       // Open network conn to backup machine, if 
       // appropriate
@@ -304,6 +334,7 @@
           for ( int i = 1; i < targets.length; i++ ) {
             targets[i].write( mirrorOut );
           }
+          accessToken.write(mirrorOut);
 
           blockReceiver.writeChecksumHeader(mirrorOut);
           mirrorOut.flush();
@@ -429,8 +460,24 @@
    */
   void getBlockChecksum(DataInputStream in) throws IOException {
     final Block block = new Block(in.readLong(), 0 , in.readLong());
+    AccessToken accessToken = new AccessToken();
+    accessToken.readFields(in);
+    DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
+        datanode.socketWriteTimeout));
+    if (datanode.isAccessTokenEnabled
+        && !datanode.accessTokenHandler.checkAccess(accessToken, null, block
+            .getBlockId(), AccessTokenHandler.AccessMode.READ)) {
+      try {
+        out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
+        out.flush();
+        throw new IOException(
+            "Access token verification failed, on getBlockChecksum() "
+                + "for block " + block);
+      } finally {
+        IOUtils.closeStream(out);
+      }
+    }
 
-    DataOutputStream out = null;
     final MetaDataInputStream metadataIn = datanode.data.getMetaDataInputStream(block);
     final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
         metadataIn, BUFFER_SIZE));
@@ -452,8 +499,6 @@
       }
 
       //write reply
-      out = new DataOutputStream(
-          NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
       out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
       out.writeInt(bytesPerCRC);
       out.writeLong(crcPerBlock);
@@ -476,10 +521,24 @@
     // Read in the header
     long blockId = in.readLong(); // read block id
     Block block = new Block(blockId, 0, in.readLong());
+    AccessToken accessToken = new AccessToken();
+    accessToken.readFields(in);
+    if (datanode.isAccessTokenEnabled
+        && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
+            AccessTokenHandler.AccessMode.COPY)) {
+      LOG.warn("Invalid access token in request from "
+          + s.getRemoteSocketAddress() + " for copying block " + block);
+      sendResponse(s,
+          (short) DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
+          datanode.socketWriteTimeout);
+      return;
+    }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
       LOG.info("Not able to copy block " + blockId + " to " 
           + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
+      sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR, 
+          datanode.socketWriteTimeout);
       return;
     }
 
@@ -498,6 +557,8 @@
       reply = new DataOutputStream(new BufferedOutputStream(
           baseStream, SMALL_BUFFER_SIZE));
 
+      // send status first
+      reply.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
       // send block content to the target
       long read = blockSender.sendBlock(reply, baseStream, 
                                         dataXceiverServer.balanceThrottler);
@@ -538,6 +599,17 @@
     String sourceID = Text.readString(in); // read del hint
     DatanodeInfo proxySource = new DatanodeInfo(); // read proxy source
     proxySource.readFields(in);
+    AccessToken accessToken = new AccessToken();
+    accessToken.readFields(in);
+    if (datanode.isAccessTokenEnabled
+        && !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
+            AccessTokenHandler.AccessMode.REPLACE)) {
+      LOG.warn("Invalid access token in request from "
+          + s.getRemoteSocketAddress() + " for replacing block " + block);
+      sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
+          datanode.socketWriteTimeout);
+      return;
+    }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
       LOG.warn("Not able to receive block " + blockId + " from " 
@@ -571,11 +643,22 @@
       proxyOut.writeByte(DataTransferProtocol.OP_COPY_BLOCK); // op code
       proxyOut.writeLong(block.getBlockId()); // block id
       proxyOut.writeLong(block.getGenerationStamp()); // block id
+      accessToken.write(proxyOut);
       proxyOut.flush();
 
       // receive the response from the proxy
       proxyReply = new DataInputStream(new BufferedInputStream(
           NetUtils.getInputStream(proxySock), BUFFER_SIZE));
+      short status = proxyReply.readShort();
+      if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
+        if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
+          throw new IOException("Copy block " + block + " from "
+              + proxySock.getRemoteSocketAddress()
+              + " failed due to access token error");
+        }
+        throw new IOException("Copy block " + block + " from "
+            + proxySock.getRemoteSocketAddress() + " failed");
+      }
       // open a block receiver and check if the block does not exist
       blockReceiver = new BlockReceiver(
           block, proxyReply, proxySock.getRemoteSocketAddress().toString(),

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Wed May 13 17:02:29 2009
@@ -5,6 +5,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -22,6 +23,7 @@
 import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
 import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
+import org.apache.hadoop.security.AccessTokenHandler;
 
 /**
  * Keeps information related to the blocks stored in the Hadoop cluster.
@@ -219,7 +221,7 @@
 
 
   List<LocatedBlock> getBlockLocations(Block[] blocks, long offset,
-      long length, int nrBlocksToReturn) {
+      long length, int nrBlocksToReturn) throws IOException {
     int curBlk = 0;
     long curPos = 0, blkSize = 0;
     int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
@@ -263,8 +265,13 @@
             machineSet[numNodes++] = dn;
         }
       }
-      results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos,
-          blockCorrupt));
+      LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos,
+          blockCorrupt);
+      if (namesystem.isAccessTokenEnabled) {
+        b.setAccessToken(namesystem.accessTokenHandler.generateToken(b.getBlock()
+            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.READ)));
+      }
+      results.add(b);
       curPos += blocks[curBlk].getNumBytes();
       curBlk++;
     } while (curPos < endOff && curBlk < blocks.length

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Wed May 13 17:02:29 2009
@@ -85,6 +85,7 @@
   // isAlive == heartbeats.contains(this)
   // This is an optimization, because contains takes O(n) time on Arraylist
   protected boolean isAlive = false;
+  protected boolean needKeyUpdate = false;
 
   /** A queue of blocks to be replicated by this datanode */
   private BlockQueue replicateBlocks = new BlockQueue();

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=774433&r1=774432&r2=774433&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed May 13 17:02:29 2009
@@ -30,6 +30,8 @@
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMetrics;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.AccessTokenHandler;
+import org.apache.hadoop.security.ExportedAccessKeys;
 import org.apache.hadoop.security.PermissionChecker;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -45,6 +47,7 @@
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -127,6 +130,10 @@
   private FSNamesystemMetrics myFSMetrics;
   private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
   private int totalLoad = 0;
+  boolean isAccessTokenEnabled;
+  AccessTokenHandler accessTokenHandler;
+  private long accessKeyUpdateInterval;
+  private long accessTokenLifetime;
 
   //
   // Stores the correct file name hierarchy
@@ -255,6 +262,10 @@
     this.safeMode = new SafeModeInfo(conf);
     this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
                         conf.get("dfs.hosts.exclude",""));
+    if (isAccessTokenEnabled) {
+      accessTokenHandler = new AccessTokenHandler(true,
+          accessKeyUpdateInterval, accessTokenLifetime);
+    }
   }
 
   /**
@@ -392,6 +403,18 @@
                                          20*(int)(heartbeatInterval/1000));
     this.accessTimePrecision = conf.getLong("dfs.access.time.precision", 0);
     this.supportAppends = conf.getBoolean("dfs.support.append", false);
+    this.isAccessTokenEnabled = conf.getBoolean(
+        AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, false);
+    if (isAccessTokenEnabled) {
+      this.accessKeyUpdateInterval = conf.getLong(
+          AccessTokenHandler.STRING_ACCESS_KEY_UPDATE_INTERVAL, 600) * 60 * 1000L; // 10 hrs
+      this.accessTokenLifetime = conf.getLong(
+          AccessTokenHandler.STRING_ACCESS_TOKEN_LIFETIME, 600) * 60 * 1000L; // 10 hrs
+    }
+    LOG.info("isAccessTokenEnabled=" + isAccessTokenEnabled
+        + " accessKeyUpdateInterval=" + accessKeyUpdateInterval / (60 * 1000)
+        + " min(s), accessTokenLifetime=" + accessTokenLifetime / (60 * 1000)
+        + " min(s)");
   }
 
   /**
@@ -529,6 +552,16 @@
   }
 
   /**
+   * Get access keys
+   * 
+   * @return current access keys
+   */
+  ExportedAccessKeys getAccessKeys() {
+    return isAccessTokenEnabled ? accessTokenHandler.exportKeys()
+        : ExportedAccessKeys.DUMMY_KEYS;
+  }
+
+  /**
    * Get all valid locations of the block & add the block to results
    * return the length of the added block; 0 if the block is not added
    */
@@ -654,7 +687,8 @@
                                                        long offset, 
                                                        long length,
                                                        int nrBlocksToReturn,
-                                                       boolean doAccessTime) {
+                                                       boolean doAccessTime
+                                                       ) throws IOException {
     if(inode == null) {
       return null;
     }
@@ -987,6 +1021,10 @@
 
           lb = new LocatedBlock(last, targets, 
                                 fileLength-storedBlock.getNumBytes());
+          if (isAccessTokenEnabled) {
+            lb.setAccessToken(accessTokenHandler.generateToken(lb.getBlock()
+                .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+          }
 
           // Remove block from replication queue.
           blockManager.updateNeededReplications(last, 0, 0);
@@ -1093,7 +1131,12 @@
     }
         
     // Create next block
-    return new LocatedBlock(newBlock, targets, fileLength);
+    LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength);
+    if (isAccessTokenEnabled) {
+      b.setAccessToken(accessTokenHandler.generateToken(b.getBlock()
+          .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
+    }
+    return b;
   }
 
   /**
@@ -1695,6 +1738,7 @@
                                       nodeReg.getInfoPort(),
                                       nodeReg.getIpcPort());
     nodeReg.updateRegInfo(dnReg);
+    nodeReg.exportedKeys = getAccessKeys();
       
     NameNode.stateChangeLog.info(
                                  "BLOCK* NameSystem.registerDatanode: "
@@ -1894,7 +1938,7 @@
           return new DatanodeCommand[] {cmd};
         }
       
-        ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(2);
+        ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(3);
         //check pending replication
         cmd = nodeinfo.getReplicationCommand(
               blockManager.maxReplicationStreams - xmitsInProgress);
@@ -1906,6 +1950,11 @@
         if (cmd != null) {
           cmds.add(cmd);
         }
+        // check access key update
+        if (isAccessTokenEnabled && nodeinfo.needKeyUpdate) {
+          cmds.add(new KeyUpdateCommand(accessTokenHandler.exportKeys()));
+          nodeinfo.needKeyUpdate = false;
+        }
         if (!cmds.isEmpty()) {
           return cmds.toArray(new DatanodeCommand[cmds.size()]);
         }
@@ -1937,21 +1986,44 @@
       totalLoad -= node.getXceiverCount();
     }
   }
+
+  /**
+   * Update access keys.
+   */
+  void updateAccessKey() throws IOException {
+    this.accessTokenHandler.updateKeys();
+    synchronized (heartbeats) {
+      for (DatanodeDescriptor nodeInfo : heartbeats) {
+        nodeInfo.needKeyUpdate = true;
+      }
+    }
+  }
+
   /**
-   * Periodically calls heartbeatCheck().
+   * Periodically calls heartbeatCheck() and updateAccessKey()
    */
   class HeartbeatMonitor implements Runnable {
+    private long lastHeartbeatCheck;
+    private long lastAccessKeyUpdate;
     /**
      */
     public void run() {
       while (fsRunning) {
         try {
-          heartbeatCheck();
+          long now = now();
+          if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
+            heartbeatCheck();
+            lastHeartbeatCheck = now;
+          }
+          if (isAccessTokenEnabled && (lastAccessKeyUpdate + accessKeyUpdateInterval < now)) {
+            updateAccessKey();
+            lastAccessKeyUpdate = now;
+          }
         } catch (Exception e) {
           FSNamesystem.LOG.error(StringUtils.stringifyException(e));
         }
         try {
-          Thread.sleep(heartbeatRecheckInterval);
+          Thread.sleep(5000);  // 5 seconds
         } catch (InterruptedException ie) {
         }
       }



Mime
View raw message