hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r651699 - in /hadoop/core/trunk: ./ conf/ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/net/
Date Fri, 25 Apr 2008 21:05:03 GMT
Author: dhruba
Date: Fri Apr 25 14:05:01 2008
New Revision: 651699

URL: http://svn.apache.org/viewvc?rev=651699&view=rev
Log:
HADOOP-3283. The Datanode has a RPC server. It currently supports
two RPCs: the first RPC retrives the metadata about a block and the
second RPC sets the generation stamp of an existing block.
(Tsz Wo (Nicholas), SZE via dhruba)


Added:
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java   (with props)
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java   (with props)
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java   (with
props)
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/hadoop-default.xml
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeRegistration.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java
    hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java
    hadoop/core/trunk/src/test/org/apache/hadoop/net/TestNetworkTopology.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Apr 25 14:05:01 2008
@@ -12,6 +12,11 @@
     HADOOP-2865. FsShell.ls() printout format changed to print file names
     in the end of the line. (Edward J. Yoon via shv)
 
+    HADOOP-3283. The Datanode has a RPC server. It currently supports
+    two RPCs: the first RPC retrives the metadata about a block and the
+    second RPC sets the generation stamp of an existing block.
+    (Tsz Wo (Nicholas), SZE via dhruba)
+
   NEW FEATURES
 
     HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS,

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Fri Apr 25 14:05:01 2008
@@ -262,6 +262,21 @@
 </property>
 
 <property>
+  <name>dfs.datanode.ipc.address</name>
+  <value>0.0.0.0:50020</value>
+  <description>
+    The datanode ipc server address and port.
+    If the port is 0 then the server will start on a free port.
+  </description>
+</property>
+
+<property>
+  <name>dfs.datanode.handler.count</name>
+  <value>3</value>
+  <description>The number of server threads for the datanode.</description>
+</property>
+
+<property>
   <name>dfs.http.address</name>
   <value>0.0.0.0:50070</value>
   <description>

Added: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java?rev=651699&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java Fri Apr 25 14:05:01
2008
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.*;
+
+import org.apache.hadoop.io.*;
+
+/**
+ * Meta data information for a block
+ */
+class BlockMetaDataInfo extends Block {
+  static final WritableFactory FACTORY = new WritableFactory() {
+    public Writable newInstance() { return new BlockMetaDataInfo(); }
+  };
+  static {                                      // register a ctor
+    WritableFactories.setFactory(BlockMetaDataInfo.class, FACTORY);
+  }
+
+  /** get BlockMetaDataInfo from the data set and the block scanner */
+  static BlockMetaDataInfo getBlockMetaDataInfo(Block b,
+      FSDatasetInterface dataset, DataBlockScanner blockscanner
+      ) throws IOException {
+    BlockMetaDataInfo info = new BlockMetaDataInfo();
+    info.blkid = b.getBlockId();
+    info.lastScanTime = blockscanner.getLastScanTime(b);
+    info.len = dataset.getLength(b);
+    //TODO: get generation stamp here
+    return info;
+  }
+
+  //TODO: remove generationStamp if it is defined in Block
+  private long generationStamp;
+  private long lastScanTime;
+
+  public BlockMetaDataInfo() {}
+
+  long getLastScanTime() {return lastScanTime;}
+
+  long getGenerationStamp() {return generationStamp;}
+
+  /** {@inheritDoc} */
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeLong(generationStamp);
+    out.writeLong(lastScanTime);
+  }
+
+  /** {@inheritDoc} */
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    generationStamp = in.readLong();
+    lastScanTime = in.readLong();
+  }
+}
\ No newline at end of file

Propchange: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlockMetaDataInfo.java
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataBlockScanner.java Fri Apr 25 14:05:01
2008
@@ -252,7 +252,13 @@
       delBlockInfo(info);
     }
   }
-  
+
+  /** @return the last scan time */
+  synchronized long getLastScanTime(Block block) {
+    BlockScanInfo info = blockMap.get(block);
+    return info == null? 0: info.lastScanTime;
+  }
+
   /** Deletes blocks from internal structures */
   void deleteBlocks(Block[] blocks) {
     for ( Block b : blocks ) {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Fri Apr 25 14:05:01 2008
@@ -79,7 +79,7 @@
  * information to clients or other DataNodes that might be interested.
  *
  **********************************************************/
-public class DataNode implements FSConstants, Runnable {
+public class DataNode implements InterDatanodeProtocol, FSConstants, Runnable {
   public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DataNode");
 
   /**
@@ -131,8 +131,8 @@
   private int socketWriteTimeout = 0;  
   private boolean transferToAllowed = true;
   
-  private DataBlockScanner blockScanner;
-  private Daemon blockScannerThread;
+  DataBlockScanner blockScanner;
+  Daemon blockScannerThread;
   
   private static final Random R = new Random();
 
@@ -152,6 +152,9 @@
   long balanceBandwidth;
   private Throttler balancingThrottler;
 
+  // For InterDataNodeProtocol
+  Server ipcServer;
+  
   // Record all sockets opend for data transfer
   Map<Socket, Socket> childSockets = Collections.synchronizedMap(
                                        new HashMap<Socket, Socket>());
@@ -281,7 +284,7 @@
     selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
                                      tmpPort);
     this.dnRegistration.setName(machineName + ":" + tmpPort);
-    LOG.info("Opened server at " + tmpPort);
+    LOG.info("Opened info server at " + tmpPort);
       
     this.threadGroup = new ThreadGroup("dataXceiveServer");
     this.dataXceiveServer = new Daemon(threadGroup, new DataXceiveServer(ss));
@@ -347,6 +350,16 @@
     // adjust info port
     this.dnRegistration.setInfoPort(this.infoServer.getPort());
     myMetrics = new DataNodeMetrics(conf, dnRegistration.getStorageID());
+    
+    //init ipc server
+    InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
+        conf.get("dfs.datanode.ipc.address"));
+    ipcServer = RPC.getServer(this, ipcAddr.getHostName(), ipcAddr.getPort(), 
+        conf.getInt("dfs.datanode.handler.count", 3), false, conf);
+    ipcServer.start();
+    dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
+
+    LOG.info("dnRegistration = " + dnRegistration);
   }
 
   /**
@@ -397,6 +410,17 @@
     return datanodeObject;
   } 
 
+  static InterDatanodeProtocol createInterDataNodeProtocolProxy(
+      DatanodeID datanodeid, Configuration conf) throws IOException {
+    InetSocketAddress addr = NetUtils.createSocketAddr(
+        datanodeid.getHost() + ":" + datanodeid.getIpcPort());
+    if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
+      InterDatanodeProtocol.LOG.info("InterDatanodeProtocol addr=" + addr);
+    }
+    return (InterDatanodeProtocol)RPC.waitForProxy(InterDatanodeProtocol.class,
+        InterDatanodeProtocol.versionID, addr, conf);
+  }
+
   public InetSocketAddress getNameNodeAddr() {
     return nameNodeAddr;
   }
@@ -504,6 +528,9 @@
       } catch (Exception e) {
       }
     }
+    if (ipcServer != null) {
+      ipcServer.stop();
+    }
     this.shouldRun = false;
     if (dataXceiveServer != null) {
       ((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill();
@@ -2924,6 +2951,35 @@
     } catch (Throwable e) {
       LOG.error(StringUtils.stringifyException(e));
       System.exit(-1);
+    }
+  }
+
+  // InterDataNodeProtocol implementation
+  /** {@inheritDoc} */
+  public BlockMetaDataInfo getBlockMetaDataInfo(Block block
+      ) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("block=" + block);
+    }
+    return BlockMetaDataInfo.getBlockMetaDataInfo(block, data, blockScanner);
+  }
+
+  /** {@inheritDoc} */
+  public boolean updateGenerationStamp(Block block, GenerationStamp generationstamp) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("block=" + block + ", generationstamp=" + generationstamp);
+    }
+    //TODO: update generation stamp here
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  public long getProtocolVersion(String protocol, long clientVersion
+      ) throws IOException {
+    if (protocol.equals(InterDatanodeProtocol.class.getName())) {
+      return InterDatanodeProtocol.versionID; 
+    } else {
+      throw new IOException("Unknown protocol to name node: " + protocol);
     }
   }
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeID.java Fri Apr 25 14:05:01 2008
@@ -36,13 +36,13 @@
   protected String name;      /// hostname:portNumber
   protected String storageID; /// unique per cluster storageID
   protected int infoPort;     /// the port where the infoserver is running
+  protected int ipcPort;     /// the port where the ipc server is running
 
-  /**
-   * DatanodeID default constructor
-   */
-  public DatanodeID() {
-    this("", "", -1);
-  }
+  /** Equivalent to DatanodeID(""). */
+  public DatanodeID() {this("");}
+
+  /** Equivalent to DatanodeID(nodeName, "", -1, -1). */
+  public DatanodeID(String nodeName) {this(nodeName, "", -1, -1);}
 
   /**
    * DatanodeID copy constructor
@@ -50,19 +50,25 @@
    * @param from
    */
   public DatanodeID(DatanodeID from) {
-    this(from.getName(), from.getStorageID(), from.getInfoPort());
+    this(from.getName(),
+        from.getStorageID(),
+        from.getInfoPort(),
+        from.getIpcPort());
   }
   
   /**
    * Create DatanodeID
-   * 
    * @param nodeName (hostname:portNumber) 
    * @param storageID data storage ID
+   * @param infoPort info server port 
+   * @param ipcPort ipc server port
    */
-  public DatanodeID(String nodeName, String storageID, int infoPort) {
+  public DatanodeID(String nodeName, String storageID,
+      int infoPort, int ipcPort) {
     this.name = nodeName;
     this.storageID = storageID;
     this.infoPort = infoPort;
+    this.ipcPort = ipcPort;
   }
   
   /**
@@ -87,6 +93,13 @@
   }
 
   /**
+   * @return ipcPort (the port at which the IPC server bound to)
+   */
+  public int getIpcPort() {
+    return ipcPort;
+  }
+
+  /**
    * @sets data storage ID.
    */
   void setStorageID(String storageID) {
@@ -154,16 +167,14 @@
   /////////////////////////////////////////////////
   // Writable
   /////////////////////////////////////////////////
-  /**
-   */
+  /** {@inheritDoc} */
   public void write(DataOutput out) throws IOException {
     UTF8.writeString(out, name);
     UTF8.writeString(out, storageID);
     out.writeShort(infoPort);
   }
 
-  /**
-   */
+  /** {@inheritDoc} */
   public void readFields(DataInput in) throws IOException {
     name = UTF8.readString(in);
     storageID = UTF8.readString(in);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeInfo.java Fri Apr 25 14:05:01
2008
@@ -281,10 +281,13 @@
        });
   }
 
-  /**
-   */
+  /** {@inheritDoc} */
   public void write(DataOutput out) throws IOException {
     super.write(out);
+
+    //TODO: move it to DatanodeID once HADOOP-2797 has been committed
+    out.writeShort(ipcPort);
+
     out.writeLong(capacity);
     out.writeLong(dfsUsed);
     out.writeLong(remaining);
@@ -299,10 +302,13 @@
     WritableUtils.writeEnum(out, getAdminState());
   }
 
-  /**
-   */
+  /** {@inheritDoc} */
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
+
+    //TODO: move it to DatanodeID once HADOOP-2797 has been committed
+    this.ipcPort = in.readShort() & 0x0000ffff;
+
     this.capacity = in.readLong();
     this.dfsUsed = in.readLong();
     this.remaining = in.readLong();

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeRegistration.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeRegistration.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeRegistration.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DatanodeRegistration.java Fri Apr 25
14:05:01 2008
@@ -47,15 +47,14 @@
    * Default constructor.
    */
   public DatanodeRegistration() {
-    super(null, null, -1);
-    this.storageInfo = new StorageInfo();
+    this("");
   }
   
   /**
    * Create DatanodeRegistration
    */
   public DatanodeRegistration(String nodeName) {
-    super(nodeName, "", -1);
+    super(nodeName);
     this.storageInfo = new StorageInfo();
   }
   
@@ -63,6 +62,10 @@
     this.infoPort = infoPort;
   }
   
+  void setIpcPort(int ipcPort) {
+    this.ipcPort = ipcPort;
+  }
+
   void setStorageInfo(DataStorage storage) {
     this.storageInfo = new StorageInfo(storage);
     this.storageID = storage.getStorageID();
@@ -84,22 +87,36 @@
     return Storage.getRegistrationID(storageInfo);
   }
 
+  public String toString() {
+    return getClass().getSimpleName()
+      + "(" + name
+      + ", storageID=" + storageID
+      + ", infoPort=" + infoPort
+      + ", ipcPort=" + ipcPort
+      + ")";
+  }
   /////////////////////////////////////////////////
   // Writable
   /////////////////////////////////////////////////
-  /**
-   */
+  /** {@inheritDoc} */
   public void write(DataOutput out) throws IOException {
     super.write(out);
+
+    //TODO: move it to DatanodeID once HADOOP-2797 has been committed
+    out.writeShort(ipcPort);
+
     out.writeInt(storageInfo.getLayoutVersion());
     out.writeInt(storageInfo.getNamespaceID());
     out.writeLong(storageInfo.getCTime());
   }
 
-  /**
-   */
+  /** {@inheritDoc} */
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
+
+    //TODO: move it to DatanodeID once HADOOP-2797 has been committed
+    this.ipcPort = in.readShort() & 0x0000ffff;
+
     storageInfo.layoutVersion = in.readInt();
     storageInfo.namespaceID = in.readInt();
     storageInfo.cTime = in.readLong();

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Fri Apr 25 14:05:01 2008
@@ -1253,9 +1253,7 @@
      * Datanode to be stored in the fsImage.
      */
     public void write(DataOutput out) throws IOException {
-      DatanodeID id = new DatanodeID(node.getName(), node.getStorageID(),
-                                     node.getInfoPort());
-      id.write(out);
+      new DatanodeID(node).write(out);
       out.writeLong(node.getCapacity());
       out.writeLong(node.getRemaining());
       out.writeLong(node.getLastUpdate());

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Fri Apr 25 14:05:01
2008
@@ -2035,7 +2035,8 @@
     // update the datanode's name with ip:port
     DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(),
                                       nodeReg.getStorageID(),
-                                      nodeReg.getInfoPort());
+                                      nodeReg.getInfoPort(),
+                                      nodeReg.getIpcPort());
     nodeReg.updateRegInfo(dnReg);
       
     NameNode.stateChangeLog.info(
@@ -3252,7 +3253,7 @@
     if (listDeadNodes) {
       for (Iterator<String> it = mustList.keySet().iterator(); it.hasNext();) {
         DatanodeDescriptor dn = 
-            new DatanodeDescriptor(new DatanodeID(it.next(), "", 0));
+            new DatanodeDescriptor(new DatanodeID(it.next()));
         dn.setLastUpdate(0);
         nodes.add(dn);
       }

Added: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java?rev=651699&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java Fri Apr 25
14:05:01 2008
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+/** An inter-datanode protocol for updating generation stamp
+ */
+interface InterDatanodeProtocol extends VersionedProtocol {
+  public static final Log LOG = LogFactory.getLog(InterDatanodeProtocol.class);
+
+  /**
+   * 1: added getBlockMetaDataInfo and updateGenerationStamp
+   */
+  public static final long versionID = 1L;
+
+  /** @return the BlockMetaDataInfo of a block */
+  BlockMetaDataInfo getBlockMetaDataInfo(Block block) throws IOException;
+
+  /**
+   * Update the GenerationStamp of a block
+   * @return true iff update was required and done successfully 
+   */
+  boolean updateGenerationStamp(Block block, GenerationStamp generationstamp
+      ) throws IOException;
+}
\ No newline at end of file

Propchange: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/InterDatanodeProtocol.java
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/MiniDFSCluster.java Fri Apr 25 14:05:01
2008
@@ -18,11 +18,7 @@
 package org.apache.hadoop.dfs;
 
 import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.BufferedOutputStream;
-import java.io.PipedOutputStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -355,6 +351,7 @@
     // Set up the right ports for the datanodes
     conf.set("dfs.datanode.address", "127.0.0.1:0");
     conf.set("dfs.datanode.http.address", "127.0.0.1:0");
+    conf.set("dfs.datanode.ipc.address", "0.0.0.0:0");
     
     String[] args = (operation == null ||
                      operation == StartupOption.FORMAT ||
@@ -505,6 +502,16 @@
     return list;
   }
   
+  /** @return the datanode having the ipc server listen port */
+  DataNode getDataNode(int ipcPort) {
+    for(DataNode dn : getDataNodes()) {
+      if (dn.ipcServer.getListenerAddress().getPort() == ipcPort) {
+        return dn;
+      }
+    }
+    return null;
+  }
+
   /**
    * Gets the rpc port used by the NameNode, because the caller 
    * supplied port is not necessarily the actual port used.

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java Fri Apr 25 14:05:01
2008
@@ -23,14 +23,14 @@
 public class TestHost2NodesMap extends TestCase {
   static private Host2NodesMap map = new Host2NodesMap();
   private final static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] {
-    new DatanodeDescriptor(new DatanodeID("h1:5020", "0", -1), "/d1/r1"),
-    new DatanodeDescriptor(new DatanodeID("h2:5020", "0", -1), "/d1/r1"),
-    new DatanodeDescriptor(new DatanodeID("h3:5020", "0", -1), "/d1/r2"),
-    new DatanodeDescriptor(new DatanodeID("h3:5030", "0", -1), "/d1/r2"),
+    new DatanodeDescriptor(new DatanodeID("h1:5020"), "/d1/r1"),
+    new DatanodeDescriptor(new DatanodeID("h2:5020"), "/d1/r1"),
+    new DatanodeDescriptor(new DatanodeID("h3:5020"), "/d1/r2"),
+    new DatanodeDescriptor(new DatanodeID("h3:5030"), "/d1/r2"),
   };
   private final static DatanodeDescriptor NULL_NODE = null; 
   private final static DatanodeDescriptor NODE = 
-    new DatanodeDescriptor(new DatanodeID("h3:5040", "0", -1), "/d1/r4");
+    new DatanodeDescriptor(new DatanodeID("h3:5040"), "/d1/r4");
 
   static {
     for(DatanodeDescriptor node:dataNodes) {

Added: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java?rev=651699&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java Fri Apr
25 14:05:01 2008
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This tests InterDataNodeProtocol for block handling. 
+ */
+public class TestInterDatanodeProtocol extends junit.framework.TestCase {
+  public void testGetBlockMetaDataInfo() throws IOException {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = null;
+
+    try {
+      cluster = new MiniDFSCluster(conf, 3, true, null);
+      cluster.waitActive();
+
+      //create a file
+      DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+      String filepath = "/foo";
+      DFSTestUtil.createFile(dfs, new Path(filepath), 1024L, (short)3, 0L);
+      assertTrue(dfs.dfs.exists(filepath));
+
+      //get block info
+      ClientProtocol namenode = dfs.dfs.namenode;
+      LocatedBlocks locations = namenode.getBlockLocations(
+          filepath, 0, Long.MAX_VALUE);
+      List<LocatedBlock> blocks = locations.getLocatedBlocks();
+      assertTrue(blocks.size() > 0);
+
+      LocatedBlock locatedblock = blocks.get(0);
+      DatanodeInfo[] datanodeinfo = locatedblock.getLocations();
+      assertTrue(datanodeinfo.length > 0);
+
+      //connect to a data node
+      InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy(
+          datanodeinfo[0], conf);
+      DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
+      assertTrue(datanode != null);
+      
+      //stop block scanner, so we could compare lastScanTime
+      datanode.blockScannerThread.interrupt();
+
+      //verify BlockMetaDataInfo
+      Block b = locatedblock.getBlock();
+      InterDatanodeProtocol.LOG.info("b=" + b + ", " + b.getClass());
+      BlockMetaDataInfo metainfo = idp.getBlockMetaDataInfo(b);
+      assertEquals(b.getBlockId(), metainfo.getBlockId());
+      assertEquals(b.getNumBytes(), metainfo.getNumBytes());
+      assertEquals(datanode.blockScanner.getLastScanTime(b),
+          metainfo.getLastScanTime());
+
+      //TODO: verify GenerationStamp
+      InterDatanodeProtocol.LOG.info("idp.updateGenerationStamp="
+          + idp.updateGenerationStamp(b, new GenerationStamp(456789L)));
+    }
+    finally {
+      if (cluster != null) {cluster.shutdown();}
+    }
+  }
+}
\ No newline at end of file

Propchange: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestInterDatanodeProtocol.java
------------------------------------------------------------------------------
    svn:keywords = Id Revision HeadURL

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java Fri Apr 25
14:05:01 2008
@@ -38,16 +38,16 @@
   private static ReplicationTargetChooser replicator;
   private static DatanodeDescriptor dataNodes[] = 
     new DatanodeDescriptor[] {
-      new DatanodeDescriptor(new DatanodeID("h1:5020", "0", -1), "/d1/r1"),
-      new DatanodeDescriptor(new DatanodeID("h2:5020", "0", -1), "/d1/r1"),
-      new DatanodeDescriptor(new DatanodeID("h3:5020", "0", -1), "/d1/r2"),
-      new DatanodeDescriptor(new DatanodeID("h4:5020", "0", -1), "/d1/r2"),
-      new DatanodeDescriptor(new DatanodeID("h5:5020", "0", -1), "/d2/r3"),
-      new DatanodeDescriptor(new DatanodeID("h6:5020", "0", -1), "/d2/r3")
+      new DatanodeDescriptor(new DatanodeID("h1:5020"), "/d1/r1"),
+      new DatanodeDescriptor(new DatanodeID("h2:5020"), "/d1/r1"),
+      new DatanodeDescriptor(new DatanodeID("h3:5020"), "/d1/r2"),
+      new DatanodeDescriptor(new DatanodeID("h4:5020"), "/d1/r2"),
+      new DatanodeDescriptor(new DatanodeID("h5:5020"), "/d2/r3"),
+      new DatanodeDescriptor(new DatanodeID("h6:5020"), "/d2/r3")
     };
    
   private final static DatanodeDescriptor NODE = 
-    new DatanodeDescriptor(new DatanodeID("h7:5020", "0", -1), "/d2/r4");
+    new DatanodeDescriptor(new DatanodeID("h7:5020"), "/d2/r4");
   
   static {
     try {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/net/TestNetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/net/TestNetworkTopology.java?rev=651699&r1=651698&r2=651699&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/net/TestNetworkTopology.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/net/TestNetworkTopology.java Fri Apr 25 14:05:01
2008
@@ -30,16 +30,16 @@
 public class TestNetworkTopology extends TestCase {
   private final static NetworkTopology cluster = new NetworkTopology();
   private final static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] {
-    new DatanodeDescriptor(new DatanodeID("h1:5020", "0", -1), "/d1/r1"),
-    new DatanodeDescriptor(new DatanodeID("h2:5020", "0", -1), "/d1/r1"),
-    new DatanodeDescriptor(new DatanodeID("h3:5020", "0", -1), "/d1/r2"),
-    new DatanodeDescriptor(new DatanodeID("h4:5020", "0", -1), "/d1/r2"),
-    new DatanodeDescriptor(new DatanodeID("h5:5020", "0", -1), "/d1/r2"),
-    new DatanodeDescriptor(new DatanodeID("h6:5020", "0", -1), "/d2/r3"),
-    new DatanodeDescriptor(new DatanodeID("h7:5020", "0", -1), "/d2/r3")
+    new DatanodeDescriptor(new DatanodeID("h1:5020"), "/d1/r1"),
+    new DatanodeDescriptor(new DatanodeID("h2:5020"), "/d1/r1"),
+    new DatanodeDescriptor(new DatanodeID("h3:5020"), "/d1/r2"),
+    new DatanodeDescriptor(new DatanodeID("h4:5020"), "/d1/r2"),
+    new DatanodeDescriptor(new DatanodeID("h5:5020"), "/d1/r2"),
+    new DatanodeDescriptor(new DatanodeID("h6:5020"), "/d2/r3"),
+    new DatanodeDescriptor(new DatanodeID("h7:5020"), "/d2/r3")
   };
   private final static DatanodeDescriptor NODE = 
-    new DatanodeDescriptor(new DatanodeID("h8:5020", "0", -1), "/d2/r4");
+    new DatanodeDescriptor(new DatanodeID("h8:5020"), "/d2/r4");
   
   static {
     for(int i=0; i<dataNodes.length; i++) {



Mime
View raw message