hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1325056 - in /hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/server/journalservice/ src/test/java/org/apache/hadoop/hdfs/server/journ...
Date Wed, 11 Apr 2012 22:57:47 GMT
Author: szetszwo
Date: Wed Apr 11 22:57:46 2012
New Revision: 1325056

URL: http://svn.apache.org/viewvc?rev=1325056&view=rev
Log:
HDFS-3213 Persist the cluster id and nsid in JournalService storage directory.  Contributed
by Hari Mankude

Modified:
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt?rev=1325056&r1=1325055&r2=1325056&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
(original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
Wed Apr 11 22:57:46 2012
@@ -5,9 +5,13 @@ HDFS-3092 branch changes
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+
     HDFS-3185. Add configuration of Journal Service Daemons. 
     (Hari Mankude via suresh)
 
+    HDFS-3213 Persist the cluster id and nsid in JournalService storage
+    directory.  (Hari Mankude via szetszwo)
+
   IMPROVEMENTS
 
   OPTIMIZATIONS

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java?rev=1325056&r1=1325055&r2=1325056&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java
(original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java
Wed Apr 11 22:57:46 2012
@@ -41,6 +41,10 @@ public class UnregisteredNodeException e
   public UnregisteredNodeException(NodeRegistration nodeReg) {
     super("Unregistered server: " + nodeReg.toString());
   }
+  
+  public UnregisteredNodeException(String msg) {
+    super("Unregistered server: " + msg);
+  }
 
   /**
    * The exception is thrown if a different data-node claims the same

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java?rev=1325056&r1=1325055&r2=1325056&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
(original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
Wed Apr 11 22:57:46 2012
@@ -19,6 +19,10 @@ package org.apache.hadoop.hdfs.server.jo
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,6 +35,9 @@ import org.apache.hadoop.hdfs.protocolPB
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
 import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
 import org.apache.hadoop.hdfs.server.protocol.FencedException;
 import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
@@ -66,12 +73,14 @@ public class JournalService implements J
 
   private final JournalListener listener;
   private final InetSocketAddress nnAddress;
-  private final NamenodeRegistration registration;
+  private NamenodeRegistration registration;
   private final NamenodeProtocol namenode;
   private final StateHandler stateHandler = new StateHandler();
   private final RPC.Server rpcServer;
   private long epoch = 0;
   private String fencerInfo;
+  private StorageInfo storageInfo;
+  private Configuration conf;
   
   enum State {
     /** The service is initialized and ready to start. */
@@ -172,11 +181,16 @@ public class JournalService implements J
         NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true)
         .getProxy();
     this.rpcServer = createRpcServer(conf, serverAddress, this);
+    this.conf = conf;
+  
+    // Load the newly formatted image, using all of the directories
+    FSImage image = new FSImage(conf);
+    storageInfo = image.getStorage();
+    LOG.info("JournalService constructor, nsid " + storageInfo.getNamespaceID()
+        + " cluster id " + storageInfo.getClusterID());
 
     String addr = NetUtils.getHostPortString(rpcServer.getListenerAddress());
-    StorageInfo storage = new StorageInfo(
-        LayoutVersion.getCurrentLayoutVersion(), 0, "", 0);
-    registration = new NamenodeRegistration(addr, "", storage,
+    registration = new NamenodeRegistration(addr, "", storageInfo,
         NamenodeRole.BACKUP);
   }
   
@@ -187,10 +201,10 @@ public class JournalService implements J
     stateHandler.start();
 
     // Start the RPC server
-    LOG.info("Starting rpc server");
+    LOG.info("Starting journal service rpc server");
     rpcServer.start();
 
-    for(boolean registered = false, handshakeComplete = false; ; ) {
+    for (boolean registered = false, handshakeComplete = false;;) {
       try {
         // Perform handshake
         if (!handshakeComplete) {
@@ -198,7 +212,7 @@ public class JournalService implements J
           handshakeComplete = true;
           LOG.info("handshake completed");
         }
-        
+
         // Register with the namenode
         if (!registered) {
           registerWithNamenode();
@@ -211,7 +225,7 @@ public class JournalService implements J
       } catch (Exception e) {
         LOG.warn("Encountered exception ", e);
       }
-      
+
       try {
         Thread.sleep(1000);
       } catch (InterruptedException ie) {
@@ -259,13 +273,47 @@ public class JournalService implements J
     listener.rollLogs(this, txid);
     stateHandler.startLogSegment();
   }
+  
+  private void setupStorage(JournalInfo jinfo) throws IOException {
+    setupStorage(jinfo.getNamespaceId(), jinfo.getClusterId());
+  }
+  
+  private void setupStorage(NamespaceInfo nsinfo) throws IOException {
+    setupStorage(nsinfo.getNamespaceID(), nsinfo.getClusterID());
+  }
+  
+  // Will format the edits dir and save the nsid + cluster id.
+  private void setupStorage(int nsId, String clusterId) throws IOException {
+    // For now, use the namespacedirs and edits dir to save the journal info.
+    // Going forward, this can be modified to extract journal specific edits
+    // dir.
+    Collection<URI> dirsToFormat = new ArrayList<URI>();
+    List<URI> editUrisToFormat = FSNamesystem
+        .getNamespaceEditsDirs(conf, false);
+    NNStorage nnStorage = new NNStorage(conf, dirsToFormat, editUrisToFormat);
+    LOG.info("Setting up storage for nsid " + nsId + " clusterid " + clusterId);
+    nnStorage.format(new NamespaceInfo(nsId, clusterId, "journalservice", 0, 0));
+
+    storageInfo = new StorageInfo(LayoutVersion.getCurrentLayoutVersion(),
+        nsId, clusterId, 0);
+    registration = new NamenodeRegistration(
+        NetUtils.getHostPortString(rpcServer.getListenerAddress()), "",
+        storageInfo, NamenodeRole.BACKUP);
+  }
 
   @Override
   public FenceResponse fence(JournalInfo journalInfo, long epoch,
       String fencerInfo) throws IOException {
     LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch);
+   
+    // This implies that this is the first fence on the journal service
+    // It does not have any nsid or cluster id info.
+    if ((storageInfo.getClusterID() == null)
+        || (storageInfo.getNamespaceID() == 0)) {
+      setupStorage(journalInfo);
+    }
     verifyFence(epoch, fencerInfo);
-    verify(journalInfo);
+    verify(journalInfo.getNamespaceId(), journalInfo.getClusterId());
     long previousEpoch = epoch;
     this.epoch = epoch;
     this.fencerInfo = fencerInfo;
@@ -310,20 +358,23 @@ public class JournalService implements J
   /** 
    * Verifies a journal request
    */
-  private void verify(JournalInfo journalInfo) throws IOException {
+  private void verify(int nsid, String clusid) throws IOException {
     String errorMsg = null;
     int expectedNamespaceID = registration.getNamespaceID();
-    if (journalInfo.getNamespaceId() != expectedNamespaceID) {
-      errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID
-          + " actual " + journalInfo.getNamespaceId();
+
+    if (nsid != expectedNamespaceID) {
+      errorMsg = "Invalid namespaceID in journal request - expected "
+          + expectedNamespaceID + " actual " + nsid;
       LOG.warn(errorMsg);
-      throw new UnregisteredNodeException(journalInfo);
-    } 
-    if (!journalInfo.getClusterId().equals(registration.getClusterID())) {
-      errorMsg = "Invalid clusterId in journal request - expected "
-          + journalInfo.getClusterId() + " actual " + registration.getClusterID();
+      throw new UnregisteredNodeException(errorMsg);
+    }
+    if ((clusid == null)
+        || (!clusid.equals(registration.getClusterID()))) {
+      errorMsg = "Invalid clusterId in journal request - incoming "
+          + clusid + " expected "
+          + registration.getClusterID();
       LOG.warn(errorMsg);
-      throw new UnregisteredNodeException(journalInfo);
+      throw new UnregisteredNodeException(errorMsg);
     }
   }
   
@@ -332,7 +383,7 @@ public class JournalService implements J
    */
   private void verify(long e, JournalInfo journalInfo) throws IOException {
     verifyEpoch(e);
-    verify(journalInfo);
+    verify(journalInfo.getNamespaceId(), journalInfo.getClusterId());
   }
   
   /**
@@ -355,7 +406,16 @@ public class JournalService implements J
   private void handshake() throws IOException {
     NamespaceInfo nsInfo = namenode.versionRequest();
     listener.verifyVersion(this, nsInfo);
-    registration.setStorageInfo(nsInfo);
+
+    // If this is the first initialization of journal service, then storage
+    // directory will be setup. Otherwise, nsid and clusterid has to match with
+    // the info saved in the edits dir.
+    if ((storageInfo.getClusterID() == null)
+        || (storageInfo.getNamespaceID() == 0)) {
+      setupStorage(nsInfo);
+    } else {
+      verify(nsInfo.getNamespaceID(), nsInfo.getClusterID());
+    }
   }
 
   @VisibleForTesting

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java?rev=1325056&r1=1325055&r2=1325056&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
(original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
Wed Apr 11 22:57:46 2012
@@ -22,12 +22,15 @@ import java.net.InetSocketAddress;
 
 import junit.framework.Assert;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
 import org.apache.hadoop.hdfs.server.protocol.FencedException;
@@ -41,6 +44,7 @@ import org.mockito.Mockito;
 public class TestJournalService {
   private MiniDFSCluster cluster;
   private Configuration conf = new HdfsConfiguration();
+  static final Log LOG = LogFactory.getLog(TestJournalService.class);
   
   /**
    * Test calls backs {@link JournalListener#rollLogs(JournalService, long)} and
@@ -60,7 +64,7 @@ public class TestJournalService {
       verifyFence(service, cluster.getNameNode(0));
     } finally {
       if (service != null) {
-        service.stop();
+        stopJournalService(service);
       }
       if (cluster != null) {
         cluster.shutdown();
@@ -78,6 +82,10 @@ public class TestJournalService {
     return service;
   }
   
+  private void stopJournalService(JournalService service) throws IOException {
+    service.stop();
+  }
+  
   /**
    * Starting {@link JournalService} should result in Namenode calling
    * {@link JournalService#startLogSegment}, resulting in callback 
@@ -109,19 +117,45 @@ public class TestJournalService {
     // Fence the journal service
     JournalInfo info = new JournalInfo(lv, cid, nsId);
     long currentEpoch = s.getEpoch();
-    
+   
     // New epoch lower than the current epoch is rejected
     try {
       s.fence(info, (currentEpoch - 1), "fencer");
-    } catch (FencedException ignore) { /* Ignored */ } 
+      Assert.fail();
+    } catch (FencedException ignore) { /* Ignored */ }
     
     // New epoch equal to the current epoch is rejected
     try {
       s.fence(info, currentEpoch, "fencer");
-    } catch (FencedException ignore) { /* Ignored */ } 
+      Assert.fail();
+    } catch (FencedException ignore) { /* Ignored */ }
     
     // New epoch higher than the current epoch is successful
     FenceResponse resp = s.fence(info, currentEpoch+1, "fencer");
     Assert.assertNotNull(resp);
+    
+    JournalInfo badInfo = new JournalInfo(lv, "fake", nsId);
+    currentEpoch = s.getEpoch();
+   
+    // Send in the wrong cluster id. fence should fail
+    try {
+      s.fence(badInfo, currentEpoch+1, "fencer");
+      Assert.fail();
+      
+    } catch (UnregisteredNodeException ignore) {
+      LOG.info(ignore.getMessage());
+    }
+  
+    badInfo = new JournalInfo(lv, cid, nsId+1);
+    currentEpoch = s.getEpoch();
+    
+    // Send in the wrong nsid. fence should fail
+    try {
+      s.fence(badInfo, currentEpoch+1, "fencer");
+      Assert.fail();
+    } catch (UnregisteredNodeException ignore) {
+      LOG.info(ignore.getMessage());
+    } 
+   
   }
 }
\ No newline at end of file



Mime
View raw message