hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1334792 - in /hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/journalservice/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/...
Date Sun, 06 May 2012 21:45:36 GMT
Author: suresh
Date: Sun May  6 21:45:35 2012
New Revision: 1334792

URL: http://svn.apache.org/viewvc?rev=1334792&view=rev
Log:
HDFS-3186. Add ability to journal service to sync journals from another active journal service.
Contributed by Brandon Li.


Modified:
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalHttpServer.java
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalHttpServer.java
    hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
(original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3092.txt
Sun May  6 21:45:35 2012
@@ -26,6 +26,9 @@ HDFS-3092 branch changes
     HDFS-3313. Create a protocol for journal service synchronziation.  (Brandon
     Li via szetszwo)
 
+    HDFS-3186. Add ability to journal service to sync journals from another
+    active journal service. (Brandon Li via suresh)
+
   IMPROVEMENTS
 
   OPTIMIZATIONS

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
(original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
Sun May  6 21:45:35 2012
@@ -204,6 +204,7 @@ public class DFSConfigKeys extends Commo
   // This is a comma separated host:port list of addresses hosting the journal service
   public static final String  DFS_JOURNAL_ADDRESS_KEY = "dfs.journal.addresses";
   public static final String  DFS_JOURNAL_EDITS_DIR_KEY = "dfs.journal.edits.dir";
+  public static final String  DFS_JOURNAL_HTTP_ADDRESS_KEY = "dfs.journal.http-addresses";
   public static final String  DFS_JOURNAL_HTTPS_PORT_KEY = "dfs.journal.https-port";
   public static final int     DFS_JOURNAL_HTTPS_PORT_DEFAULT = 50510;
   public static final String  DFS_JOURNAL_KRB_HTTPS_USER_NAME_KEY = "dfs.journal.kerberos.https.principal";

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
(original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
Sun May  6 21:45:35 2012
@@ -1025,9 +1025,21 @@ public class DFSUtil {
     }
   }
   
+  private static Collection<InetSocketAddress> getAddresses(Configuration conf,
+      String addrKey) {
+    Collection<String> addresses = conf.getTrimmedStringCollection(addrKey);
+    Collection<InetSocketAddress> ret = new ArrayList<InetSocketAddress>();
+    for (String address : emptyAsSingletonNull(addresses)) {
+      if (address == null) {
+        continue;
+      }
+      ret.add(NetUtils.createSocketAddr(address));
+    }
+    return ret;
+  }
   
   /**
-   * Returns list of InetSocketAddresses of journalnodes from the
+   * Returns list of RPC server InetSocketAddresses of journal services from the
    * configuration.
    * 
    * @param conf configuration
@@ -1035,15 +1047,38 @@ public class DFSUtil {
    */
   public static Collection<InetSocketAddress> getJournalNodeAddresses(
       Configuration conf) {
-    Collection<String> jnames = conf
-        .getTrimmedStringCollection(DFS_JOURNAL_ADDRESS_KEY);
-    Collection<InetSocketAddress> ret = new ArrayList<InetSocketAddress>();
-    for (String jname : emptyAsSingletonNull(jnames)) {
-      if (jname == null) {
-        continue;
+    return getAddresses(conf, DFS_JOURNAL_ADDRESS_KEY);
+  }
+  
+  /**
+   * Returns list of http InetSocketAddresses of journal services from the
+   * configuration.
+   * 
+   * @param conf configuration
+   * @return list of http InetSocketAddresses
+   */
+  public static Collection<InetSocketAddress> getJournalNodeHttpAddresses(
+      Configuration conf) {
+    return getAddresses(conf, DFS_JOURNAL_HTTP_ADDRESS_KEY);
+  }
+  
+  /**
+   * Returns corresponding rpc address with the hostname running journal
+   * service.
+   * 
+   * @param conf configuration
+   * @param hostname the hostname in the http address
+   * @return rpc address of the journal service
+   */
+  public static InetSocketAddress getJournalRpcAddrFromHostName(
+      Configuration conf, String hostname) {
+    Collection<InetSocketAddress> jRpcAddr = DFSUtil
+        .getJournalNodeAddresses(conf);
+    for (InetSocketAddress addr : jRpcAddr) {
+      if (addr != null && addr.getHostName().equals(hostname)) {
+        return addr;
       }
-      ret.add(NetUtils.createSocketAddr(jname));
     }
-    return ret;
+    return null;
   }
 }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java
(original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/Journal.java
Sun May  6 21:45:35 2012
@@ -117,7 +117,7 @@ class Journal {
     return image.getEditLog();
   }
   
-  RemoteEditLogManifest getRemoteEditLogs(long sinceTxId) throws IOException {
+  RemoteEditLogManifest getEditLogManifest(long sinceTxId) throws IOException {
     return image.getEditLog().getEditLogManifest(sinceTxId);
   }
   

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalHttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalHttpServer.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalHttpServer.java
(original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalHttpServer.java
Sun May  6 21:45:35 2012
@@ -63,7 +63,7 @@ public class JournalHttpServer {
   private final Configuration conf;
 
   JournalHttpServer(Configuration conf, Journal journal,
-      InetSocketAddress bindAddress) throws Exception {
+      InetSocketAddress bindAddress) {
     this.conf = conf;
     this.localJournal = journal;
     this.httpAddress = bindAddress;
@@ -125,12 +125,20 @@ public class JournalHttpServer {
         + " and https port is: " + httpsPort);
   }
 
-  void stop() throws Exception {
+  void stop() throws IOException {
     if (httpServer != null) {
-      httpServer.stop();
+      try {
+        httpServer.stop();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
     }
   }
 
+  InetSocketAddress getHttpAddress() {
+    return httpAddress;
+  }
+  
   public static Journal getJournalFromContext(ServletContext context) {
     return (Journal) context.getAttribute(JOURNAL_ATTRIBUTE_KEY);
   }
@@ -145,7 +153,7 @@ public class JournalHttpServer {
    * @return true if a new image has been downloaded and needs to be loaded
    * @throws IOException
    */
-  public boolean downloadEditFiles(final String jnHostPort,
+  boolean downloadEditFiles(final String jnHostPort,
       final RemoteEditLogManifest manifest) throws IOException {
 
     // Sanity check manifest

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
(original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/journalservice/JournalService.java
Sun May  6 21:45:35 2012
@@ -17,12 +17,15 @@
  */
 package org.apache.hadoop.hdfs.server.journalservice;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.Collection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
@@ -34,9 +37,12 @@ import org.apache.hadoop.hdfs.protocolPB
 import org.apache.hadoop.hdfs.protocolPB.JournalSyncProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.JournalSyncProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
 import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
 import org.apache.hadoop.hdfs.server.protocol.FencedException;
 import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.JournalServiceProtocols;
 import org.apache.hadoop.hdfs.server.protocol.JournalSyncProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@@ -47,6 +53,7 @@ import org.apache.hadoop.ipc.ProtobufRpc
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Daemon;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.BlockingService;
@@ -68,14 +75,20 @@ import com.google.protobuf.BlockingServi
  */
 public class JournalService implements JournalServiceProtocols {
   public static final Log LOG = LogFactory.getLog(JournalService.class.getName());
-
   private final InetSocketAddress nnAddress;
   private NamenodeRegistration registration;
   private final NamenodeProtocol namenode;
   private final StateHandler stateHandler = new StateHandler();
   private final RPC.Server rpcServer;
+  private final JournalHttpServer httpServer;
   private long epoch = 0;
   private String fencerInfo;
+  private Daemon syncThread = null;
+  private Configuration conf;
+  
+  // Flags to indicate whether to start sync
+  private boolean toStartSync = false;
+  private long syncSinceTxid = -1;
 
   private final Journal journal;
   private final JournalListener listener;
@@ -128,12 +141,35 @@ public class JournalService implements J
       current = State.WAITING_FOR_ROLL;
     }
 
-    synchronized void startLogSegment() {
+    synchronized State startLogSegment() {
+      State prevState = current;
       if (current == State.WAITING_FOR_ROLL) {
         current = State.SYNCING;
       }
+      return prevState;
     }
+    
+    /**
+     * Try to transit to IN_SYNC state
+     * @return current state. if returned state is not IN_SYNC, caller should
+     *         know inSync failed
+     */
+    synchronized State inSync() {
+      if (current == State.IN_SYNC) {
+        throw new IllegalStateException("Service cannot be in " + current
+            + " state.");
+      }
 
+      if (current == State.SYNCING) {
+        current = State.IN_SYNC;
+      }
+      return current;
+    }
+    
+    synchronized void fence() {     
+      current = State.WAITING_FOR_ROLL;
+    }
+    
     synchronized void isStartLogSegmentAllowed() throws IOException {
       if (!current.isStartLogSegmentAllowed) {
         throw new IOException("Cannot start log segment in " + current
@@ -168,18 +204,21 @@ public class JournalService implements J
    *          {@code server} is a valid server that is managed out side this
    *          service.
    * @param listener call-back interface to listen to journal activities
+   * @param journal the journal used by both Listener and JournalService
    * @throws IOException on error
    */
   JournalService(Configuration conf, InetSocketAddress nnAddr,
-      InetSocketAddress serverAddress, JournalListener listener)
-      throws IOException {
+      InetSocketAddress serverAddress, InetSocketAddress httpAddress,
+      JournalListener listener, Journal journal) throws IOException {
     this.nnAddress = nnAddr;
     this.listener = listener;
-    this.journal = new Journal(conf);
+    this.journal = journal;
     this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr,
         NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true)
         .getProxy();
     this.rpcServer = createRpcServer(conf, serverAddress, this);
+    this.httpServer = new JournalHttpServer(conf, journal, httpAddress); 
+    this.conf = conf;
   }
   
   Journal getJournal() {
@@ -201,13 +240,18 @@ public class JournalService implements J
 
   /**
    * Start the service.
+   * @throws IOException on error
    */
-  public void start() {
+  public void start() throws IOException {
     stateHandler.start();
 
     // Start the RPC server
     LOG.info("Starting journal service rpc server");
     rpcServer.start();
+    
+    // Start the HTTP server
+    LOG.info("Starting journal service http server");
+    httpServer.start();
 
     for (boolean registered = false, handshakeComplete = false;;) {
       try {
@@ -239,6 +283,14 @@ public class JournalService implements J
     }
 
     stateHandler.waitForRoll();
+
+    // Create a never ending daemon to sync journal segments
+    // TODO: remove the assumption that "won't delete logs"
+    // use 3 because NN rolls with txid=3 when first journal service joining.
+    // need to fix this after NN is modified to ignore its local storage dir
+    syncThread = new Daemon(new JournalSync(this));
+    syncThread.start();
+
     try {
       namenode.rollEditLog();
     } catch (IOException e) {
@@ -249,9 +301,12 @@ public class JournalService implements J
   /**
    * Stop the service. For application with RPC Server managed outside, the
    * RPC Server must be stopped the application.
+   * @throws IOException on error
    */
   public void stop() throws IOException {
     if (!stateHandler.isStopped()) {
+      syncThread.interrupt();
+      httpServer.stop();
       rpcServer.stop();
       journal.close();
     }
@@ -277,7 +332,11 @@ public class JournalService implements J
     stateHandler.isStartLogSegmentAllowed();
     verify(epoch, journalInfo);
     listener.startLogSegment(this, txid);
-    stateHandler.startLogSegment();
+    
+    if (stateHandler.startLogSegment() == State.WAITING_FOR_ROLL) {
+      LOG.info("Notify syncThread to re-sync with txid:" + syncSinceTxid);
+      startSync(syncSinceTxid);
+    }
   }
 
   @Override
@@ -294,7 +353,8 @@ public class JournalService implements J
     long previousEpoch = epoch;
     this.epoch = epoch;
     this.fencerInfo = fencerInfo;
-    
+    stateHandler.fence();
+
     // TODO:HDFS-3092 set lastTransId and inSync
     return new FenceResponse(previousEpoch, 0, false);
   }
@@ -310,8 +370,8 @@ public class JournalService implements J
     }
     verify(journalInfo.getNamespaceId(), journalInfo.getClusterId());
 
-    //journal has only one storage directory
-    return journal.getRemoteEditLogs(sinceTxId);
+    // Journal has only one storage directory
+    return journal.getEditLogManifest(sinceTxId);
   }
 
   /** Create an RPC server. */
@@ -435,4 +495,150 @@ public class JournalService implements J
 
     return new JournalSyncProtocolTranslatorPB((JournalSyncProtocolPB) proxy);
   }
+  
+  /**
+   * Only invoked by sync thread to wait for {@code #syncSinceTxid} to be set to
+   * start syncing.
+   * 
+   * @return txid to start syncing from
+   * @throws InterruptedException 
+   */
+  synchronized long waitForStartSync() throws InterruptedException {
+    while (!toStartSync) {
+      wait();
+    }
+    // Sync starting - Unset toStartSync so main thread can set it again
+    toStartSync = false;
+    return syncSinceTxid;
+  }
+
+  /**
+   * Only invoked by main thread to notify sync thread that another round of
+   * sync is needed
+   */
+  synchronized void startSync(long sinceTxid) {
+    if (toStartSync) {
+      LOG.trace("toStartSync is already set.");
+      return;
+    }
+    toStartSync = true;
+    syncSinceTxid = sinceTxid;
+    notify();
+  }
+
+  /**
+   * JournalSync downloads journal segments from other journal services
+   */
+  class JournalSync implements Runnable {
+    private final JournalInfo journalInfo;
+    private final JournalService journalService;
+    private long sinceTxid;
+
+    /**
+     * Constructor
+     * @param journalService Local journal service
+     */
+    JournalSync(JournalService journalService) {
+      NNStorage storage = journalService.getJournal().getStorage();
+      this.journalInfo = new JournalInfo(storage.layoutVersion,
+          storage.clusterID, storage.namespaceID);
+      this.sinceTxid = 0;
+      this.journalService = journalService;
+    }
+
+    public void run() {
+      while (true) {
+        try {
+          sinceTxid = journalService.waitForStartSync();
+          syncAllJournalSegments(conf, journalInfo, sinceTxid);
+        } catch (IOException e) {
+          LOG.error("Unable to sync for "
+              + journalService.getRegistration().getHttpAddress()
+              + " with exception: " + e);
+          try {
+            Thread.sleep(60000);
+          } catch (InterruptedException e1) {
+            break;
+          }
+        } catch (InterruptedException ie) {
+          break;
+        }
+      }
+      LOG.info("Stopping the JouranlSync thread");
+    }
+
+    public String toString() {
+      return "JournalSync for "
+          + journalService.getRegistration().getHttpAddress();
+    }
+
+    /**
+     * Contact journal service one by one to get all missed journal segments
+     * 
+     * @param conf  Configuration
+     * @param journalInfo  the JournalInfo of the local journal service
+     * @param sinceTxid  the transaction id to start with
+     * @throws IOException
+     */
+    void syncAllJournalSegments(Configuration conf, JournalInfo journalInfo,
+        long sinceTxid) throws IOException {
+
+      // Get a list of configured journal services
+      Collection<InetSocketAddress> addrList = DFSUtil
+          .getJournalNodeHttpAddresses(conf);
+      FSEditLog editLog = journal.getEditLog();
+      File currentDir = new File(
+          conf.get(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY));
+
+      boolean needSync = !editLog.hasCompleteJournalSegments(sinceTxid,
+          currentDir);
+      if (!needSync) {
+        LOG.trace("Nothing to sync.");
+        return;
+      }
+
+      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+      InetSocketAddress selfAddr = journalService.httpServer.getHttpAddress();
+      for (InetSocketAddress addr : addrList) {
+        try {
+          // Skip itself
+          if (addr.getHostName().equals(selfAddr.getHostName())
+              && addr.getPort() == selfAddr.getPort()) {
+            continue;
+          }
+          // Download journal segments
+          InetSocketAddress rpcAddr = DFSUtil.getJournalRpcAddrFromHostName(
+              conf, addr.getHostName());
+          JournalSyncProtocol syncProxy = createProxyWithJournalSyncProtocol(
+              rpcAddr, conf, ugi);
+          RemoteEditLogManifest manifest = syncProxy.getEditLogManifest(
+              journalInfo, sinceTxid);
+          httpServer.downloadEditFiles(NetUtils.getHostPortString(addr),
+              manifest);
+        } catch (IOException e) {
+          LOG.debug("Sync failed for " + selfAddr + "with exception ", e);
+          // Ignore error and try the next journal service
+        }
+
+        if (editLog.hasCompleteJournalSegments(sinceTxid, currentDir)) {
+          needSync = false;
+          break;
+        }
+      }
+
+      if (needSync) {
+        throw new IOException("Journal sync failed.");
+      }
+
+      // Journal service may not be in SYNCING state
+      State jState = stateHandler.inSync();
+      if (jState != State.IN_SYNC) {
+        LOG.debug("Journal service state changed during syncing : " + jState);
+      } else {
+        LOG.debug("Journal sync is done.");
+        // TODO: report IN_SYNC state to NN. Note that, it's ok if state changes
+        // to another state because NN could reject the IN_SYNC report
+      }
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
(original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
Sun May  6 21:45:35 2012
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.na
 
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 
+import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.net.URI;
@@ -63,6 +64,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
@@ -437,6 +439,59 @@ public class FSEditLog  {
   }
   
   /**
+   * Check if current node has complete set of journal segments from sinceTxid
+   * to (curSegmentTxId-1)
+   * @param sinceTxid the transaction to start with
+   * @param currentDir the storage directory to find journal segments
+   * @return true if journal segments are complete, otherwise false
+   * @throws IOException
+   */
+  public synchronized boolean hasCompleteJournalSegments(long sinceTxid,
+      File currentDir) throws IOException {
+    List<RemoteEditLog> segments = getFinalizedSegments(sinceTxid, currentDir);
+    long curSegTxid = getCurSegmentTxId();
+    return hasCompleteJournalSegments(segments, sinceTxid, curSegTxid);
+  }
+
+  public static boolean hasCompleteJournalSegments(
+      List<RemoteEditLog> segments, long sinceTxid, long curSegTxid)
+      throws IOException {
+    long minTxid = -1, maxTxid = -1;
+
+    if (sinceTxid > curSegTxid) {
+      throw new RuntimeException("illegal input: sinceTxid=" + sinceTxid
+          + " curSegTxid= " + curSegTxid);
+    }
+    if (segments.size() == 0) {      
+      return false;
+    }
+
+    for (RemoteEditLog log : segments) {
+      if (log.getStartTxId() > curSegTxid || log.getEndTxId() > curSegTxid) {
+        throw new RuntimeException("Invalid log: [" + log.getStartTxId() + ","
+            + log.getEndTxId() + "] and curSegTxid=" + curSegTxid);
+      }
+      
+      if (minTxid == -1 || minTxid == -1) {
+        minTxid = log.getStartTxId();
+        maxTxid = log.getEndTxId();
+        assert (minTxid > 0 && maxTxid > 0 && maxTxid > minTxid);
+      } else {
+        // check gap in the middle
+        if (maxTxid != log.getStartTxId() - 1) {
+          return false;
+        }
+        maxTxid = log.getEndTxId();
+      }
+    }
+    // check gap at ends
+    if (minTxid > sinceTxid || maxTxid < curSegTxid - 1) {
+      return false;
+    }    
+    return true;
+  }
+  
+  /**
    * Set the transaction ID to use for the next transaction written.
    */
   synchronized void setNextTxId(long nextTxId) {
@@ -860,7 +915,15 @@ public class FSEditLog  {
       throws IOException {
     return journalSet.getEditLogManifest(fromTxId);
   }
- 
+  
+  /**
+   * Return a list of what finalized edit logs are available
+   */
+  public synchronized List<RemoteEditLog> getFinalizedSegments(long fromTxId,
+      File currentDir) throws IOException {
+    return journalSet.getFinalizedSegments(fromTxId, currentDir);
+  }
+  
   /**
    * Finalizes the current edit log and opens a new log segment.
    * @return the transaction id of the BEGIN_LOG_SEGMENT transaction

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
(original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
Sun May  6 21:45:35 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -618,6 +619,39 @@ public class JournalSet implements Journ
   }
 
   /**
+   * Returns a list of finalized edit logs that are available in given
+   * storageDir {@code currentDir}. All available edit logs are returned
+   * starting from the transaction id {@code fromTxId}. Note that the list may
+   * have gaps in these finalized edit logs.
+   * 
+   * @param fromTxId Starting transaction id to read the logs.
+   * @param currentDir The storage directory to find the logs.
+   * @return a list of finalized segments.
+   */
+  public synchronized List<RemoteEditLog> getFinalizedSegments(long fromTxId,
+      File currentDir) {
+    List<RemoteEditLog> allLogs = null;
+    for (JournalAndStream j : journals) {
+      if (j.getManager() instanceof FileJournalManager) {
+        FileJournalManager fjm = (FileJournalManager) j.getManager();
+        if (fjm.getStorageDirectory().getRoot().equals(currentDir)) {
+          try {
+            allLogs = fjm.getRemoteEditLogs(fromTxId);
+            break;
+          } catch (IOException t) {
+            LOG.warn("Cannot list edit logs in " + fjm, t);
+          }
+        }
+      }
+    }
+    // sort collected segments
+    if (allLogs != null && allLogs.size() > 0) {
+      Collections.sort(allLogs);
+    }
+    return allLogs;
+  }
+
+  /**
    * Add sync times to the buffer.
    */
   String getSyncTimes() {

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
(original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
Sun May  6 21:45:35 2012
@@ -587,5 +587,16 @@ public class TestDFSUtil {
     for (InetSocketAddress addr: ret) {
       assertTrue(addr.equals(isa1) || addr.equals(isa2));
     }
+    
+    // Test Http addresses
+    InetSocketAddress isa3 = NetUtils.createSocketAddr("localhost:50100");
+    InetSocketAddress isa4 = NetUtils.createSocketAddr("localhost:50110");
+    conf.set(DFS_JOURNAL_HTTP_ADDRESS_KEY, "localhost:50100, localhost:50110");
+    ret = DFSUtil.getJournalNodeHttpAddresses(conf);
+    assertEquals(ret.size(), 2);
+    
+    for (InetSocketAddress addr: ret) {
+      assertTrue(addr.equals(isa3) || addr.equals(isa4));
+    }
   }
 }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java
(original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournal.java
Sun May  6 21:45:35 2012
@@ -22,6 +22,7 @@ import java.io.File;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -35,6 +36,9 @@ public class TestJournal {
   static Configuration newConf(String name) {
     Configuration conf = new HdfsConfiguration();
     File dir = new File(MiniDFSCluster.getBaseDirectory(), name + "-edits");
+    if (dir.exists()) {
+      Assert.assertTrue(FileUtil.fullyDelete(dir));
+    }
     Assert.assertTrue(dir.mkdirs());
     conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, dir.toURI().toString());
     return conf;

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalHttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalHttpServer.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalHttpServer.java
(original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalHttpServer.java
Sun May  6 21:45:35 2012
@@ -20,9 +20,15 @@ package org.apache.hadoop.hdfs.server.jo
 import static org.junit.Assert.*;
 
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,20 +39,17 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.journalservice.JournalHttpServer;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
 import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
-import org.apache.hadoop.hdfs.server.protocol.JournalSyncProtocol;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Level;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 public class TestJournalHttpServer {
   public static final Log LOG = LogFactory.getLog(TestJournalHttpServer.class);
@@ -85,7 +88,7 @@ public class TestJournalHttpServer {
   }
 
   /**
-   * Test Journal service Http Server
+   * Test Journal service Http Server by verifying the html page is accessible
    * 
    * @throws Exception
    */
@@ -116,79 +119,113 @@ public class TestJournalHttpServer {
         cluster.shutdown();
     }
   }
+  
+  /**
+   * Test hasCompleteJournalSegments with different log list combinations
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testHasCompleteJournalSegments() throws Exception {
+    List<RemoteEditLog> logs = new ArrayList<RemoteEditLog>();
+    logs.add(new RemoteEditLog(3,6));
+    logs.add(new RemoteEditLog(7,10));
+    logs.add(new RemoteEditLog(11,12));
+
+    assertTrue(FSEditLog.hasCompleteJournalSegments(logs, 3, 13));
+    assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 1, 13));
+    assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 13, 19));
+    assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 11, 19));
+
+    logs.remove(1);
+    assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 3, 13));
+    assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 1, 13));
+    assertFalse(FSEditLog.hasCompleteJournalSegments(logs, 3, 19));
+  }
+  
+  private void copyNNFiles(MiniDFSCluster cluster, File dstDir)
+      throws IOException {
+    Collection<URI> editURIs = cluster.getNameEditsDirs(0);
+    String firstURI = editURIs.iterator().next().getPath().toString();
+    File nnDir = new File(new String(firstURI + "/current"));
+   
+    File allFiles[] = FileUtil.listFiles(nnDir);
+    for (File f : allFiles) {
+      IOUtils.copyBytes(new FileInputStream(f),
+          new FileOutputStream(dstDir + "/" + f.getName()), 4096, true);
+    }
+  }
 
   /**
    * Test lagging Journal service copies edit segments from another Journal
-   * service
+   * service: 
+   * 1. start one journal service 
+   * 2. reboot namenode so more segments are created
+   * 3. add another journal service and this new journal service should sync
+   *    with the first journal service
    * 
    * @throws Exception
    */
   @Test
   public void testCopyEdits() throws Exception {
     MiniDFSCluster cluster = null;
-    JournalService service = null;
-    JournalHttpServer jhs1 = null, jhs2 = null;
+    JournalService service1 = null, service2 = null;
 
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
 
-      // restart namenode, so it will have finalized edit segments
-      cluster.restartNameNode();
-
+      // start journal service
       conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path1.getPath());
       InetSocketAddress nnAddr = cluster.getNameNode(0).getNameNodeAddress();
-      InetSocketAddress serverAddr = new InetSocketAddress(50900);
-      JournalListener listener = Mockito.mock(JournalListener.class);
-      service = new JournalService(conf, nnAddr, serverAddr, listener);
-      service.start();
-      
+      InetSocketAddress serverAddr = NetUtils
+          .createSocketAddr("localhost:50900");
+      Journal j1 = new Journal(conf);
+      JournalListener listener1 = new JournalDiskWriter(j1);
+      service1 = new JournalService(conf, nnAddr, serverAddr,
+          new InetSocketAddress(50901), listener1, j1);
+      service1.start();
+
       // get namenode clusterID/layoutVersion/namespaceID
-      StorageInfo si = service.getJournal().getStorage();
+      StorageInfo si = service1.getJournal().getStorage();
       JournalInfo journalInfo = new JournalInfo(si.layoutVersion, si.clusterID,
           si.namespaceID);
 
-      // start jns1 with path1
-      jhs1 = new JournalHttpServer(conf, service.getJournal(),
-          NetUtils.createSocketAddr("localhost:50200"));
-      jhs1.start();
+      // restart namenode, so there will be one more journal segments
+      cluster.restartNameNode();
 
-      // get all edit segments
-      InetSocketAddress srcRpcAddr = NameNode.getServiceAddress(conf, true);
-      NamenodeProtocol namenode = NameNodeProxies.createNonHAProxy(conf,
-          srcRpcAddr, NamenodeProtocol.class,
-          UserGroupInformation.getCurrentUser(), true).getProxy();
-
-      RemoteEditLogManifest manifest = namenode.getEditLogManifest(1);      
-      jhs1.downloadEditFiles(
-          conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY), manifest);
+      // TODO: remove file copy when NN can work with journal auto-machine
+      copyNNFiles(cluster, new File(new String(path1.toString() + "/current")));
 
-      // start jns2 with path2
+      // start another journal service that will do the sync
+      conf.set(DFSConfigKeys.DFS_JOURNAL_ADDRESS_KEY, "localhost:50900");
       conf.set(DFSConfigKeys.DFS_JOURNAL_EDITS_DIR_KEY, path2.getPath());
-      Journal journal2 = new Journal(conf);
-      journal2.format(si.namespaceID, si.clusterID);   
-      jhs2 = new JournalHttpServer(conf, journal2,
-          NetUtils.createSocketAddr("localhost:50300"));
-      jhs2.start();
-
-      // transfer edit logs from j1 to j2
-      JournalSyncProtocol journalp = JournalService.createProxyWithJournalSyncProtocol(
-          NetUtils.createSocketAddr("localhost:50900"), conf,
-          UserGroupInformation.getCurrentUser());
-      RemoteEditLogManifest manifest1 = journalp.getEditLogManifest(journalInfo, 1);  
-      jhs2.downloadEditFiles("localhost:50200", manifest1);
+      conf.set(DFSConfigKeys.DFS_JOURNAL_HTTP_ADDRESS_KEY,
+          "localhost:50902, localhost:50901");
+      Journal j2 = new Journal(conf);
+      JournalListener listener2 = new JournalDiskWriter(j2);
+      service2 = new JournalService(conf, nnAddr, new InetSocketAddress(50800),
+          NetUtils.createSocketAddr("localhost:50902"), listener2, j2);
+      service2.start();
+
+      // give service2 sometime to sync
+      Thread.sleep(5000);
+      
+      // TODO: change to sinceTxid to 1 after NN is modified to use journal
+      // service to start
+      RemoteEditLogManifest manifest2 = service2.getEditLogManifest(
+          journalInfo, 3);
+      assertTrue(manifest2.getLogs().size() > 0);
 
     } catch (IOException e) {
       LOG.error("Error in TestCopyEdits:", e);
       assertTrue(e.getLocalizedMessage(), false);
     } finally {
-      if (jhs1 != null)
-        jhs1.stop();
-      if (jhs2 != null)
-        jhs2.stop();
       if (cluster != null)
         cluster.shutdown();
-      if (service != null)
-        service.stop();
+      if (service1 != null)
+        service1.stop();
+      if (service2 != null)
+        service2.stop();
     }
   }
 }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java?rev=1334792&r1=1334791&r2=1334792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
(original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/journalservice/TestJournalService.java
Sun May  6 21:45:35 2012
@@ -52,6 +52,7 @@ public class TestJournalService {
   @Test
   public void testCallBacks() throws Exception {
     Configuration conf = TestJournal.newConf("testCallBacks");
+    Journal journal = new Journal(conf);
     JournalListener listener = Mockito.mock(JournalListener.class);
     JournalService service = null;
     MiniDFSCluster cluster = null;
@@ -59,7 +60,7 @@ public class TestJournalService {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
       cluster.waitActive(0);
       InetSocketAddress nnAddr = cluster.getNameNode(0).getNameNodeAddress();
-      service = newJournalService(nnAddr, listener, conf);
+      service = newJournalService(nnAddr, listener, journal, conf);
       service.start();
       verifyRollLogsCallback(service, listener);
       verifyJournalCallback(cluster.getFileSystem(), service, listener);
@@ -73,6 +74,10 @@ public class TestJournalService {
     }
   }
 
+  /**
+   * Test journal service fence with a combination of epoch, nsid and clusterid
+   * @throws Exception
+   */
   @Test
   public void testFence() throws Exception {
     final Configuration conf = TestJournal.newConf("testFence");
@@ -86,7 +91,8 @@ public class TestJournalService {
       cluster.waitActive(0);
       NameNode nn = cluster.getNameNode(0);
       nnAddress = nn.getNameNodeAddress();
-      service = newJournalService(nnAddress, listener, conf);
+      Journal j1 = new Journal(conf);
+      service = newJournalService(nnAddress, listener, j1, conf);
       service.start();
       String cid = nn.getNamesystem().getClusterId();
       int nsId = nn.getNamesystem().getFSImage().getNamespaceID();
@@ -101,17 +107,29 @@ public class TestJournalService {
     StorageInfo before = service.getJournal().getStorage();
     LOG.info("before: " + before);
     service.stop();
-    service = newJournalService(nnAddress, listener, conf);
+    Journal j2 = new Journal(conf);
+    service = newJournalService(nnAddress, listener, j2, conf);
     StorageInfo after = service.getJournal().getStorage();
     LOG.info("after : " + after);
     Assert.assertEquals(before.toString(), after.toString());
   }
 
+  /**
+   * Create additional journal service
+   * @param nnAddr  namenode rpc address
+   * @param listener the listener serving journal requests from namenode
+   * @param journal local journal
+   * @param conf local confiuration 
+   * @return journal service
+   * @throws Exception
+   */
   private JournalService newJournalService(InetSocketAddress nnAddr,
-      JournalListener listener, Configuration conf) throws IOException {
-    return new JournalService(conf, nnAddr, RPC_ADDR, listener);
+      JournalListener listener, Journal journal, Configuration conf)
+      throws Exception {
+    return new JournalService(conf, nnAddr, RPC_ADDR, new InetSocketAddress(0),
+        listener, journal);
   }
-  
+
   /**
    * Starting {@link JournalService} should result in Namenode calling
    * {@link JournalService#startLogSegment}, resulting in callback 
@@ -135,6 +153,16 @@ public class TestJournalService {
         Mockito.anyLong(), Mockito.anyInt(), (byte[]) Mockito.any());
   }
   
+  /**
+   * Verify the fence with different epoch, clusterid and nsid combinations
+   *  
+   * @param s The Journal Service to write to 
+   * @param listener the listener to serve journal request
+   * @param cid cluster id
+   * @param nsId namespace id
+   * @param lv layoutVersion
+   * @throws Exception
+   */
   void verifyFence(JournalService s, JournalListener listener,
       String cid, int nsId, int lv) throws Exception {
     



Mime
View raw message