hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1327314 - in /hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/journalservice/ src/test/java/org/apache/hadoop/hdfs/server/journalservice/
Date Tue, 17 Apr 2012 22:57:54 GMT
Author: szetszwo
Date: Tue Apr 17 22:57:53 2012
New Revision: 1327314

URL: http://svn.apache.org/viewvc?rev=1327314&view=rev
Log:
HDFS-3196. Add Journal and JournalDiskWriter for journal service.

Added:
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalDiskWriter.java
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java
Modified:
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt?rev=1327314&r1=1327313&r2=1327314&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
(original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
Tue Apr 17 22:57:53 2012
@@ -17,6 +17,9 @@ HDFS-3092 branch changes
 
     HDFS-3283. Add http server to journal service.  (Brandon Li via szetszwo)
 
+    HDFS-3196. Add Journal and JournalDiskWriter for journal service.
+    (szetszwo)
+
   IMPROVEMENTS
 
   OPTIMIZATIONS

Added: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java?rev=1327314&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java
(added)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java
Tue Apr 17 22:57:53 2012
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.journalservice;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.common.Util;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+
+/** The journal stored in local directories. */
+class Journal {
+  static final Log LOG = LogFactory.getLog(Journal.class);
+
+  private final FSImage image;
+  private volatile boolean isFormatted;
+
+  /**
+   * Constructor. It is possible that the directory is not formatted.
+   */
+  Journal(Configuration conf) throws IOException {
+    this.image = new FSImage(conf, Collections.<URI>emptyList(), getEditDirs(conf));
+
+    final Map<StorageDirectory, StorageState> states
+        = new HashMap<StorageDirectory, StorageState>();
+    isFormatted = image.recoverStorageDirs(StartupOption.REGULAR, states);
+    for(Map.Entry<StorageDirectory, StorageState> e : states.entrySet()) {
+      LOG.info(e.getKey() + ": " + e.getValue());
+    }
+    LOG.info("The journal is " + (isFormatted? "already": "not yet")
+        + " formatted: " + image.getStorage());
+  }
+
+  /**
+   * Format the local edit directory.
+   */
+  synchronized void format(int namespaceId, String clusterId) throws IOException {
+    if (isFormatted) {
+      throw new IllegalStateException("The joural is already formatted.");
+    }
+    final NNStorage s = image.getStorage(); 
+    s.format(new NamespaceInfo(namespaceId, clusterId, "dummy-bpid", 0, 0));
+    isFormatted = true;
+    LOG.info("Formatted journal: " + s);
+  }
+
+  boolean isFormatted() {
+    return isFormatted;
+  }
+  
+  StorageInfo getStorageInfo() {
+    return image.getStorage();
+  }
+
+  synchronized void verifyVersion(JournalService service, NamespaceInfo info
+      ) throws IOException {
+    if (!isFormatted) {
+      return;
+    }
+
+    final StorageInfo stored = image.getStorage();
+    if (!stored.getClusterID().equals(info.getClusterID())) {
+      throw new IOException("Cluster IDs not matched: stored = "
+          + stored.getClusterID() + " != passed = " + info.getClusterID());
+    }
+    if (stored.getNamespaceID() != info.getNamespaceID()) {
+      throw new IOException("Namespace IDs not matched: stored = "
+          + stored.getNamespaceID() + " != passed = " + info.getNamespaceID());
+    }
+    if (stored.getLayoutVersion() != info.getLayoutVersion()) {
+      throw new IOException("Layout versions not matched: stored = "
+          + stored.getLayoutVersion() + " != passed = " + info.getLayoutVersion());
+    }
+    if (stored.getCTime() != info.getCTime()) {
+      throw new IOException("CTimes not matched: stored = "
+          + stored.getCTime() + " != passed = " + info.getCTime());
+    }
+  }
+
+  void close() throws IOException {
+    image.close();
+  }
+
+  FSEditLog getEditLog() {
+    return image.getEditLog();
+  }
+
+  static List<URI> getEditDirs(Configuration conf) throws IOException {
+    final Collection<String> dirs = conf.getTrimmedStringCollection(
+        DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY);
+    LOG.info(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY + " = " + dirs);
+    return Util.stringCollectionAsURIs(dirs);
+  }
+}
\ No newline at end of file

Added: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalDiskWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalDiskWriter.java?rev=1327314&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalDiskWriter.java
(added)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalDiskWriter.java
Tue Apr 17 22:57:53 2012
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.journalservice;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+
+/** A JournalListener for writing journal to an edit log. */
+class JournalDiskWriter implements JournalListener {
+  static final Log LOG = LogFactory.getLog(JournalDiskWriter.class);
+
+  private final Journal journal;
+
+  /**
+   * Constructor. It is possible that the directory is not formatted. In this
+   * case, it creates dummy entries and storage is later formatted.
+   */
+  JournalDiskWriter(Journal journal) throws IOException {
+    this.journal = journal;
+  }
+
+  Journal getJournal() {
+    return journal;
+  }
+
+  @Override
+  public synchronized void verifyVersion(JournalService service,
+      NamespaceInfo info) throws IOException {
+    journal.verifyVersion(service, info);
+   }
+
+  @Override
+  public synchronized void journal(JournalService service, long firstTxId,
+      int numTxns, byte[] records) throws IOException {
+    journal.getEditLog().journal(firstTxId, numTxns, records);
+  }
+
+  @Override
+  public synchronized void startLogSegment(JournalService service, long txid
+      ) throws IOException {
+    journal.getEditLog().startLogSegment(txid, false);
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java?rev=1327314&r1=1327313&r2=1327314&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java
(original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalListener.java
Tue Apr 17 22:57:53 2012
@@ -35,8 +35,10 @@ public interface JournalListener {
    * 
    * The application using {@link JournalService} can stop the service if
    * {@code info} validation fails.
+   * @throws IOException 
    */
-  public void verifyVersion(JournalService service, NamespaceInfo info);
+  public void verifyVersion(JournalService service, NamespaceInfo info
+      ) throws IOException;
   
   /**
    * Process the received Journal record

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java?rev=1327314&r1=1327313&r2=1327314&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
(original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
Tue Apr 17 22:57:53 2012
@@ -17,39 +17,18 @@
  */
 package org.apache.hadoop.hdfs.server.journalservice;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
-
-import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.NameNodeProxies;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalProtocolService;
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
-import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
-import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
-import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.common.StorageInfo;
-import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.namenode.FSImage;
-import org.apache.hadoop.hdfs.server.namenode.NNStorage;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
 import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
 import org.apache.hadoop.hdfs.server.protocol.FencedException;
 import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
@@ -65,8 +44,6 @@ import org.apache.hadoop.security.UserGr
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.BlockingService;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY;
-
 /**
  * This class interfaces with the namenode using {@link JournalProtocol} over
  * RPC. It has two modes: <br>
@@ -85,7 +62,6 @@ import static org.apache.hadoop.hdfs.DFS
 public class JournalService implements JournalProtocol {
   public static final Log LOG = LogFactory.getLog(JournalService.class.getName());
 
-  private final JournalListener listener;
   private final InetSocketAddress nnAddress;
   private NamenodeRegistration registration;
   private final NamenodeProtocol namenode;
@@ -93,11 +69,10 @@ public class JournalService implements J
   private final RPC.Server rpcServer;
   private long epoch = 0;
   private String fencerInfo;
-  private StorageInfo storageInfo;
-  private Configuration conf;
-  private FSEditLog editLog;
-  private FSImage image;
-  
+
+  private final Journal journal;
+  private final JournalListener listener;
+
   enum State {
     /** The service is initialized and ready to start. */
     INIT(false, false),
@@ -193,66 +168,30 @@ public class JournalService implements J
       throws IOException {
     this.nnAddress = nnAddr;
     this.listener = listener;
+    this.journal = new Journal(conf);
     this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr,
         NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true)
         .getProxy();
     this.rpcServer = createRpcServer(conf, serverAddress, this);
-    this.conf = conf;
- 
-    try {
-      initializeJournalStorage(conf);
-    } catch (IOException ioe) {
-      LOG.info("Exception in initialize: " + ioe.getMessage());
-      throw ioe;
-    }
   }
   
-  /** This routine initializes the storage directory. It is possible that
-   *  the directory is not formatted. In this case, it creates dummy entries
-   *  and storage is later formatted.
-   */
-  private void initializeJournalStorage(Configuration conf) throws IOException {
-    
-    boolean isFormatted = false;
-    Collection<URI> dirsToFormat = new ArrayList<URI>();
-    List<URI> editUrisToFormat = getJournalEditsDirs(conf);
-
-    // Load the newly formatted image, using all of the directories
-    image = new FSImage(conf, dirsToFormat, editUrisToFormat);
-    Map<StorageDirectory, StorageState> dataDirStates =
-          new HashMap<StorageDirectory, StorageState>();
-    isFormatted = image
-        .recoverStorageDirs(StartupOption.REGULAR, dataDirStates);
-
-    if (isFormatted == true) {
-      // Directory has been formatted. So, it should have a versionfile.
-      this.editLog = image.getEditLog();
-      Iterator<StorageDirectory> sdit = image.getStorage().dirIterator(
-          NNStorage.NameNodeDirType.IMAGE);
-      StorageDirectory sd = sdit.next();
-
-      Properties props = Storage.readPropertiesFile(sd.getVersionFile());
-      String cid = props.getProperty("clusterID");
-      String nsid = props.getProperty("namespaceID");
-      String layout = props.getProperty("layoutVersion");
-      storageInfo = new StorageInfo(Integer.parseInt(layout),
-          Integer.parseInt(nsid), cid, 0);
-
-      LOG.info("JournalService constructor, nsid "
-          + storageInfo.getNamespaceID() + " cluster id "
-          + storageInfo.getClusterID());
-
-      String addr = NetUtils.getHostPortString(rpcServer.getListenerAddress());
-      registration = new NamenodeRegistration(addr, "", storageInfo,
-          NamenodeRole.BACKUP);
-    } else {
-      // Storage directory has not been formatted. So create dummy entries for now.
-      image = new FSImage(conf);
-      storageInfo = image.getStorage();
-      editLog = null;
+  Journal getJournal() {
+    return journal;
+  }
+
+  synchronized NamenodeRegistration getRegistration() {
+    if (!journal.isFormatted()) {
+      throw new IllegalStateException("Journal is not formatted.");
+    }
+
+    if (registration == null) {
+      registration = new NamenodeRegistration(
+          NetUtils.getHostPortString(rpcServer.getListenerAddress()), "",
+          journal.getStorageInfo(), NamenodeRole.BACKUP);
     }
+    return registration;
   }
-  
+
   /**
    * Start the service.
    */
@@ -304,9 +243,10 @@ public class JournalService implements J
    * Stop the service. For application with RPC Server managed outside, the
    * RPC Server must be stopped the application.
    */
-  public void stop() {
+  public void stop() throws IOException {
     if (!stateHandler.isStopped()) {
       rpcServer.stop();
+      journal.close();
     }
   }
 
@@ -332,47 +272,15 @@ public class JournalService implements J
     listener.startLogSegment(this, txid);
     stateHandler.startLogSegment();
   }
-  
-  private void setupStorage(JournalInfo jinfo) throws IOException {
-    formatStorage(jinfo.getNamespaceId(), jinfo.getClusterId());
-  }
-  
-  private void setupStorage(NamespaceInfo nsinfo) throws IOException {
-    formatStorage(nsinfo.getNamespaceID(), nsinfo.getClusterID());
-  }
-  
-  /**
-   * Will format the edits dir and save the nsid + cluster id.
-   * @param nsId
-   * @param clusterId
-   * @throws IOException
-   */
-  private void formatStorage(int nsId, String clusterId) throws IOException {
-    Collection<URI> dirsToFormat = new ArrayList<URI>();
-    List<URI> editUrisToFormat = getJournalEditsDirs(conf);
-    NNStorage nnStorage = new NNStorage(conf, dirsToFormat, editUrisToFormat);
-    LOG.info("Setting up storage for nsid " + nsId + " clusterid " + clusterId);
-    nnStorage.format(new NamespaceInfo(nsId, clusterId, "journalservice", 0, 0));
-    image = new FSImage(conf, dirsToFormat, editUrisToFormat); 
-    this.editLog = image.getEditLog();
-
-    storageInfo = new StorageInfo(LayoutVersion.getCurrentLayoutVersion(),
-        nsId, clusterId, 0);
-    registration = new NamenodeRegistration(
-        NetUtils.getHostPortString(rpcServer.getListenerAddress()), "",
-        storageInfo, NamenodeRole.BACKUP);
-  }
 
   @Override
   public FenceResponse fence(JournalInfo journalInfo, long epoch,
       String fencerInfo) throws IOException {
     LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch);
    
-    // This implies that this is the first fence on the journal service
-    // It does not have any nsid or cluster id info.
-    if ((storageInfo.getClusterID() == null)
-        || (storageInfo.getNamespaceID() == 0)) {
-      setupStorage(journalInfo);
+    // It is the first fence if the journal is not formatted, 
+    if (!journal.isFormatted()) {
+      journal.format(journalInfo.getNamespaceId(), journalInfo.getClusterId());
     }
     verifyFence(epoch, fencerInfo);
     verify(journalInfo.getNamespaceId(), journalInfo.getClusterId());
@@ -422,19 +330,18 @@ public class JournalService implements J
    */
   private void verify(int nsid, String clusid) throws IOException {
     String errorMsg = null;
-    int expectedNamespaceID = registration.getNamespaceID();
+    final NamenodeRegistration reg = getRegistration(); 
 
-    if (nsid != expectedNamespaceID) {
+    if (nsid != reg.getNamespaceID()) {
       errorMsg = "Invalid namespaceID in journal request - expected "
-          + expectedNamespaceID + " actual " + nsid;
+          + reg.getNamespaceID() + " actual " + nsid;
       LOG.warn(errorMsg);
       throw new UnregisteredNodeException(errorMsg);
     }
     if ((clusid == null)
-        || (!clusid.equals(registration.getClusterID()))) {
+        || (!clusid.equals(reg.getClusterID()))) {
       errorMsg = "Invalid clusterId in journal request - incoming "
-          + clusid + " expected "
-          + registration.getClusterID();
+          + clusid + " expected " + reg.getClusterID();
       LOG.warn(errorMsg);
       throw new UnregisteredNodeException(errorMsg);
     }
@@ -452,7 +359,7 @@ public class JournalService implements J
    * Register this service with the active namenode.
    */
   private void registerWithNamenode() throws IOException {
-    NamenodeRegistration nnReg = namenode.register(registration);
+    NamenodeRegistration nnReg = namenode.register(getRegistration());
     String msg = null;
     if(nnReg == null) { // consider as a rejection
       msg = "Registration rejected by " + nnAddress;
@@ -472,9 +379,8 @@ public class JournalService implements J
     // If this is the first initialization of journal service, then storage
     // directory will be setup. Otherwise, nsid and clusterid has to match with
     // the info saved in the edits dir.
-    if ((storageInfo.getClusterID() == null)
-        || (storageInfo.getNamespaceID() == 0)) {
-      setupStorage(nsInfo);
+    if (!journal.isFormatted()) {
+      journal.format(nsInfo.getNamespaceID(), nsInfo.getClusterID());
     } else {
       verify(nsInfo.getNamespaceID(), nsInfo.getClusterID());
     }
@@ -484,17 +390,4 @@ public class JournalService implements J
   long getEpoch() {
     return epoch;
   }
-  
-  /**
-   * Returns edit directories that are shared between primary and secondary.
-   * @param conf
-   * @return Collection of edit directories.
-   */
-  public List<URI> getJournalEditsDirs(Configuration conf) {
-    // don't use getStorageDirs here, because we want an empty default
-    // rather than the dir in /tmp
-    Collection<String> dirNames = conf.getTrimmedStringCollection(
-        DFS_JOURNAL_EDITS_DIR_KEY);
-    return Util.stringCollectionAsURIs(dirNames);
-  }
 }

Added: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java?rev=1327314&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java
(added)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java
Tue Apr 17 22:57:53 2012
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.journalservice;
+
+import java.io.File;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestJournal {
+  static final Log LOG = LogFactory.getLog(TestJournal.class);
+  
+  static Configuration newConf(String name) {
+    Configuration conf = new HdfsConfiguration();
+    File dir = new File(MiniDFSCluster.getBaseDirectory(), name + "-edits");
+    Assert.assertTrue(dir.mkdirs());
+    conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, dir.toURI().toString());
+    return conf;
+  }
+
+  @Test
+  public void testFormat() throws Exception {
+    final Configuration conf = newConf("testFormat");
+    final Journal j = new Journal(conf);
+    LOG.info("Initial  : " + j.getStorageInfo());
+    Assert.assertFalse(j.isFormatted());
+
+    //format
+    final int namespaceId = 123;
+    final String clusterId = "my-cluster-id";
+    j.format(namespaceId, clusterId);
+    Assert.assertTrue(j.isFormatted());
+
+    final StorageInfo info = j.getStorageInfo();
+    LOG.info("Formatted: " + info);
+    
+    Assert.assertEquals(namespaceId, info.getNamespaceID());
+    Assert.assertEquals(clusterId, info.getClusterID());
+    j.close();
+    
+    //create another Journal object
+    final StorageInfo another = new Journal(conf).getStorageInfo();
+    Assert.assertEquals(info.toString(), another.toString());
+  }
+}
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java?rev=1327314&r1=1327313&r2=1327314&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
(original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
Tue Apr 17 22:57:53 2012
@@ -17,12 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.journalservice;
 
-import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.URI;
-
-import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -30,15 +26,14 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
 import org.apache.hadoop.hdfs.server.protocol.FencedException;
 import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -46,10 +41,9 @@ import org.mockito.Mockito;
  * Tests for {@link JournalService}
  */
 public class TestJournalService {
-  private MiniDFSCluster cluster;
-  private Configuration conf = new HdfsConfiguration();
   static final Log LOG = LogFactory.getLog(TestJournalService.class);
-  
+  static final InetSocketAddress RPC_ADDR = new InetSocketAddress(0);
+
   /**
    * Test calls backs {@link JournalListener#startLogSegment(JournalService, long)} and
    * {@link JournalListener#journal(JournalService, long, int, byte[])} are
@@ -57,47 +51,65 @@ public class TestJournalService {
    */
   @Test
   public void testCallBacks() throws Exception {
+    Configuration conf = TestJournal.newConf("testCallBacks");
     JournalListener listener = Mockito.mock(JournalListener.class);
     JournalService service = null;
+    MiniDFSCluster cluster = null;
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
       cluster.waitActive(0);
-      FileSystem fs = FileSystem.getLocal(conf);
-      File journalEditsDir = cluster.getJournalEditsDir();
-      boolean result = fs.mkdirs(new Path(journalEditsDir.toString()));
-      conf.set(DFS_JOURNAL_EDITS_DIR_KEY, journalEditsDir.toString());
-      service = startJournalService(listener);
+      InetSocketAddress nnAddr = cluster.getNameNode(0).getNameNodeAddress();
+      service = newJournalService(nnAddr, listener, conf);
+      service.start();
       verifyRollLogsCallback(service, listener);
-      verifyJournalCallback(service, listener);
-      verifyFence(service, listener, cluster.getNameNode(0));
+      verifyJournalCallback(cluster.getFileSystem(), service, listener);
     } finally {
-      if (service != null) {
-        stopJournalService(service);
-      }
       if (cluster != null) {
         cluster.shutdown();
       }
+      if (service != null) {
+        service.stop();
+      }
     }
   }
-  
-  private JournalService restartJournalService(JournalService service,
-      JournalListener listener) throws IOException {
-    stopJournalService(service);
-    return (startJournalService(listener));
-  }
 
-  private JournalService startJournalService(JournalListener listener)
-      throws IOException {
-    InetSocketAddress nnAddr = cluster.getNameNode(0).getNameNodeAddress();
-    InetSocketAddress serverAddr = new InetSocketAddress(0);
-    JournalService service = new JournalService(conf, nnAddr, serverAddr,
-        listener);
-    service.start();
-    return service;
-  }
-  
-  private void stopJournalService(JournalService service) throws IOException {
+  @Test
+  public void testFence() throws Exception {
+    final Configuration conf = TestJournal.newConf("testFence");
+    final JournalListener listener = Mockito.mock(JournalListener.class);
+    final InetSocketAddress nnAddress;
+
+    JournalService service = null;
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster.waitActive(0);
+      NameNode nn = cluster.getNameNode(0);
+      nnAddress = nn.getNameNodeAddress();
+      service = newJournalService(nnAddress, listener, conf);
+      service.start();
+      String cid = nn.getNamesystem().getClusterId();
+      int nsId = nn.getNamesystem().getFSImage().getNamespaceID();
+      int lv = nn.getNamesystem().getFSImage().getLayoutVersion();
+
+      verifyFence(service, listener, cid, nsId, lv);
+    } finally {
+      cluster.shutdown();
+    }
+
+    //test restart journal service
+    StorageInfo before = service.getJournal().getStorageInfo();
+    LOG.info("before: " + before);
     service.stop();
+    service = newJournalService(nnAddress, listener, conf);
+    StorageInfo after = service.getJournal().getStorageInfo();
+    LOG.info("after : " + after);
+    Assert.assertEquals(before.toString(), after.toString());
+  }
+
+  private JournalService newJournalService(InetSocketAddress nnAddr,
+      JournalListener listener, Configuration conf) throws IOException {
+    return new JournalService(conf, nnAddr, RPC_ADDR, listener);
   }
   
   /**
@@ -114,19 +126,17 @@ public class TestJournalService {
    * File system write operations should result in JournalListener call
    * backs.
    */
-  private void verifyJournalCallback(JournalService s, JournalListener l) throws IOException
{
+  private void verifyJournalCallback(FileSystem fs, JournalService s,
+      JournalListener l) throws IOException {
     Path fileName = new Path("/tmp/verifyJournalCallback");
-    FileSystem fs = cluster.getFileSystem();
     FileSystemTestHelper.createFile(fs, fileName);
     fs.delete(fileName, true);
     Mockito.verify(l, Mockito.atLeastOnce()).journal(Mockito.eq(s),
         Mockito.anyLong(), Mockito.anyInt(), (byte[]) Mockito.any());
   }
   
-  public void verifyFence(JournalService s, JournalListener listener, NameNode nn) throws
Exception {
-    String cid = nn.getNamesystem().getClusterId();
-    int nsId = nn.getNamesystem().getFSImage().getNamespaceID();
-    int lv = nn.getNamesystem().getFSImage().getLayoutVersion();
+  void verifyFence(JournalService s, JournalListener listener,
+      String cid, int nsId, int lv) throws Exception {
     
     // Fence the journal service
     JournalInfo info = new JournalInfo(lv, cid, nsId);
@@ -171,7 +181,6 @@ public class TestJournalService {
       LOG.info(ignore.getMessage());
     } 
     
-    s = restartJournalService(s, listener);
     // New epoch higher than the current epoch is successful
     resp = s.fence(info, currentEpoch+1, "fencer");
     Assert.assertNotNull(resp);



Mime
View raw message