hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1561516 [2/4] - in /hadoop/common/branches/HDFS-5535/hadoop-hdfs-project: hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/conf/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/...
Date Sun, 26 Jan 2014 16:34:35 GMT
Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java?rev=1561516&r1=1561515&r2=1561516&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java Sun Jan 26 16:34:30 2014
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.qjournal.p
 import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
 import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolTranslatorPB;
 import org.apache.hadoop.hdfs.qjournal.server.GetJournalEditServlet;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -564,6 +565,72 @@ public class IPCLoggerChannel implements
       }
     });
   }
+  
+  @Override
+  public ListenableFuture<Void> doPreUpgrade() {
+    return executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws IOException {
+        getProxy().doPreUpgrade(journalId);
+        return null;
+      }
+    });
+  }
+  
+  @Override
+  public ListenableFuture<Void> doUpgrade(final StorageInfo sInfo) {
+    return executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws IOException {
+        getProxy().doUpgrade(journalId, sInfo);
+        return null;
+      }
+    });
+  }
+  
+  @Override
+  public ListenableFuture<Void> doFinalize() {
+    return executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws IOException {
+        getProxy().doFinalize(journalId);
+        return null;
+      }
+    });
+  }
+  
+  @Override
+  public ListenableFuture<Boolean> canRollBack(final StorageInfo storage,
+      final StorageInfo prevStorage, final int targetLayoutVersion) {
+    return executor.submit(new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws IOException {
+        return getProxy().canRollBack(journalId, storage, prevStorage,
+            targetLayoutVersion);
+      }
+    });
+  }
+
+  @Override
+  public ListenableFuture<Void> doRollback() {
+    return executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws IOException {
+        getProxy().doRollback(journalId);
+        return null;
+      }
+    });
+  }
+  
+  @Override
+  public ListenableFuture<Long> getJournalCTime() {
+    return executor.submit(new Callable<Long>() {
+      @Override
+      public Long call() throws IOException {
+        return getProxy().getJournalCTime(journalId);
+      }
+    });
+  }
 
   @Override
   public String toString() {
@@ -636,4 +703,5 @@ public class IPCLoggerChannel implements
   private boolean hasHttpServerEndPoint() {
    return httpServerURL != null;
   }
+
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1561516&r1=1561515&r2=1561516&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java Sun Jan 26 16:34:30 2014
@@ -34,10 +34,13 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
 import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
 import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
@@ -77,8 +80,14 @@ public class QuorumJournalManager implem
   // Since these don't occur during normal operation, we can
   // use rather lengthy timeouts, and don't need to make them
   // configurable.
-  private static final int FORMAT_TIMEOUT_MS = 60000;
-  private static final int HASDATA_TIMEOUT_MS = 60000;
+  private static final int FORMAT_TIMEOUT_MS            = 60000;
+  private static final int HASDATA_TIMEOUT_MS           = 60000;
+  private static final int CAN_ROLL_BACK_TIMEOUT_MS     = 60000;
+  private static final int FINALIZE_TIMEOUT_MS          = 60000;
+  private static final int PRE_UPGRADE_TIMEOUT_MS       = 60000;
+  private static final int ROLL_BACK_TIMEOUT_MS         = 60000;
+  private static final int UPGRADE_TIMEOUT_MS           = 60000;
+  private static final int GET_JOURNAL_CTIME_TIMEOUT_MS = 60000;
   
   private final Configuration conf;
   private final URI uri;
@@ -492,4 +501,131 @@ public class QuorumJournalManager implem
     return loggers;
   }
 
+  @Override
+  public void doPreUpgrade() throws IOException {
+    QuorumCall<AsyncLogger, Void> call = loggers.doPreUpgrade();
+    try {
+      call.waitFor(loggers.size(), loggers.size(), 0, PRE_UPGRADE_TIMEOUT_MS,
+          "doPreUpgrade");
+      
+      if (call.countExceptions() > 0) {
+        call.rethrowException("Could not do pre-upgrade of one or more JournalNodes");
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted waiting for doPreUpgrade() response");
+    } catch (TimeoutException e) {
+      throw new IOException("Timed out waiting for doPreUpgrade() response");
+    }
+  }
+
+  @Override
+  public void doUpgrade(Storage storage) throws IOException {
+    QuorumCall<AsyncLogger, Void> call = loggers.doUpgrade(storage);
+    try {
+      call.waitFor(loggers.size(), loggers.size(), 0, UPGRADE_TIMEOUT_MS,
+          "doUpgrade");
+      
+      if (call.countExceptions() > 0) {
+        call.rethrowException("Could not perform upgrade of one or more JournalNodes");
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted waiting for doUpgrade() response");
+    } catch (TimeoutException e) {
+      throw new IOException("Timed out waiting for doUpgrade() response");
+    }
+  }
+  
+  @Override
+  public void doFinalize() throws IOException {
+    QuorumCall<AsyncLogger, Void> call = loggers.doFinalize();
+    try {
+      call.waitFor(loggers.size(), loggers.size(), 0, FINALIZE_TIMEOUT_MS,
+          "doFinalize");
+      
+      if (call.countExceptions() > 0) {
+        call.rethrowException("Could not finalize one or more JournalNodes");
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted waiting for doFinalize() response");
+    } catch (TimeoutException e) {
+      throw new IOException("Timed out waiting for doFinalize() response");
+    }
+  }
+  
+  @Override
+  public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
+      int targetLayoutVersion) throws IOException {
+    QuorumCall<AsyncLogger, Boolean> call = loggers.canRollBack(storage,
+        prevStorage, targetLayoutVersion);
+    try {
+      call.waitFor(loggers.size(), loggers.size(), 0, CAN_ROLL_BACK_TIMEOUT_MS,
+          "lockSharedStorage");
+      
+      if (call.countExceptions() > 0) {
+        call.rethrowException("Could not check if roll back possible for"
+            + " one or more JournalNodes");
+      }
+      
+      // Either they all return the same thing or this call fails, so we can
+      // just return the first result.
+      DFSUtil.assertAllResultsEqual(call.getResults().values());
+      for (Boolean result : call.getResults().values()) {
+        return result;
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted waiting for lockSharedStorage() " +
+          "response");
+    } catch (TimeoutException e) {
+      throw new IOException("Timed out waiting for lockSharedStorage() " +
+          "response");
+    }
+    
+    throw new AssertionError("Unreachable code.");
+  }
+
+  @Override
+  public void doRollback() throws IOException {
+    QuorumCall<AsyncLogger, Void> call = loggers.doRollback();
+    try {
+      call.waitFor(loggers.size(), loggers.size(), 0, ROLL_BACK_TIMEOUT_MS,
+          "doRollback");
+      
+      if (call.countExceptions() > 0) {
+        call.rethrowException("Could not perform rollback of one or more JournalNodes");
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted waiting for doFinalize() response");
+    } catch (TimeoutException e) {
+      throw new IOException("Timed out waiting for doFinalize() response");
+    }
+  }
+  
+  @Override
+  public long getJournalCTime() throws IOException {
+    QuorumCall<AsyncLogger, Long> call = loggers.getJournalCTime();
+    try {
+      call.waitFor(loggers.size(), loggers.size(), 0,
+          GET_JOURNAL_CTIME_TIMEOUT_MS, "getJournalCTime");
+      
+      if (call.countExceptions() > 0) {
+        call.rethrowException("Could not journal CTime for one "
+            + "more JournalNodes");
+      }
+      
+      // Either they all return the same thing or this call fails, so we can
+      // just return the first result.
+      DFSUtil.assertAllResultsEqual(call.getResults().values());
+      for (Long result : call.getResults().values()) {
+        return result;
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted waiting for getJournalCTime() " +
+          "response");
+    } catch (TimeoutException e) {
+      throw new IOException("Timed out waiting for getJournalCTime() " +
+          "response");
+    }
+    
+    throw new AssertionError("Unreachable code.");
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java?rev=1561516&r1=1561515&r2=1561516&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java Sun Jan 26 16:34:30 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.qjournal.p
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
 import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.JournalManager;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.security.KerberosInfo;
@@ -143,4 +144,17 @@ public interface QJournalProtocol {
    */
   public void acceptRecovery(RequestInfo reqInfo,
       SegmentStateProto stateToAccept, URL fromUrl) throws IOException;
+
+  public void doPreUpgrade(String journalId) throws IOException;
+
+  public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException;
+
+  public void doFinalize(String journalId) throws IOException;
+
+  public Boolean canRollBack(String journalId, StorageInfo storage,
+      StorageInfo prevStorage, int targetLayoutVersion) throws IOException;
+
+  public void doRollback(String journalId) throws IOException;
+
+  public Long getJournalCTime(String journalId) throws IOException;
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java?rev=1561516&r1=1561515&r2=1561516&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java Sun Jan 26 16:34:30 2014
@@ -17,17 +17,35 @@
  */
 package org.apache.hadoop.hdfs.qjournal.protocolPB;
 
+import java.io.IOException;
+import java.net.URL;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.CanRollBackRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.CanRollBackResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoFinalizeRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoFinalizeResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoPreUpgradeRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoPreUpgradeResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoRollbackRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoRollbackResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoUpgradeRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoUpgradeResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto;
@@ -39,8 +57,6 @@ import org.apache.hadoop.hdfs.qjournal.p
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
-import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
-import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsRequestProto;
@@ -48,13 +64,11 @@ import org.apache.hadoop.hdfs.qjournal.p
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
-import java.io.IOException;
-import java.net.URL;
-
 /**
  * Implementation for protobuf service that forwards requests
  * received on {@link JournalProtocolPB} to the 
@@ -244,4 +258,79 @@ public class QJournalProtocolServerSideT
         reqInfo.hasCommittedTxId() ?
           reqInfo.getCommittedTxId() : HdfsConstants.INVALID_TXID);
   }
+
+
+  @Override
+  public DoPreUpgradeResponseProto doPreUpgrade(RpcController controller,
+      DoPreUpgradeRequestProto request) throws ServiceException {
+    try {
+      impl.doPreUpgrade(convert(request.getJid()));
+      return DoPreUpgradeResponseProto.getDefaultInstance();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public DoUpgradeResponseProto doUpgrade(RpcController controller,
+      DoUpgradeRequestProto request) throws ServiceException {
+    try {
+      impl.doUpgrade(convert(request.getJid()),
+          PBHelper.convert(request.getSInfo()));
+      return DoUpgradeResponseProto.getDefaultInstance();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public DoFinalizeResponseProto doFinalize(RpcController controller,
+      DoFinalizeRequestProto request) throws ServiceException {
+    try {
+      impl.doFinalize(convert(request.getJid()));
+      return DoFinalizeResponseProto.getDefaultInstance();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public CanRollBackResponseProto canRollBack(RpcController controller,
+      CanRollBackRequestProto request) throws ServiceException {
+    try {
+      Boolean result = impl.canRollBack(convert(request.getJid()),
+          PBHelper.convert(request.getStorage()),
+          PBHelper.convert(request.getPrevStorage()),
+          request.getTargetLayoutVersion());
+      return CanRollBackResponseProto.newBuilder()
+          .setCanRollBack(result)
+          .build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public DoRollbackResponseProto doRollback(RpcController controller, DoRollbackRequestProto request)
+      throws ServiceException {
+    try {
+      impl.doRollback(convert(request.getJid()));
+      return DoRollbackResponseProto.getDefaultInstance();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public GetJournalCTimeResponseProto getJournalCTime(RpcController controller,
+      GetJournalCTimeRequestProto request) throws ServiceException {
+    try {
+      Long resultCTime = impl.getJournalCTime(convert(request.getJid()));
+      return GetJournalCTimeResponseProto.newBuilder()
+          .setResultCTime(resultCTime)
+          .build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java?rev=1561516&r1=1561515&r2=1561516&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java Sun Jan 26 16:34:30 2014
@@ -23,13 +23,23 @@ import java.net.URL;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.CanRollBackRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.CanRollBackResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoFinalizeRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoPreUpgradeRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoRollbackRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoUpgradeRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto;
@@ -39,7 +49,6 @@ import org.apache.hadoop.hdfs.qjournal.p
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
-import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsRequestProto;
@@ -47,6 +56,7 @@ import org.apache.hadoop.hdfs.qjournal.p
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.ipc.ProtobufHelper;
@@ -277,4 +287,85 @@ public class QJournalProtocolTranslatorP
         RPC.getProtocolVersion(QJournalProtocolPB.class), methodName);
   }
 
+  @Override
+  public void doPreUpgrade(String jid) throws IOException {
+    try {
+      rpcProxy.doPreUpgrade(NULL_CONTROLLER,
+          DoPreUpgradeRequestProto.newBuilder()
+            .setJid(convertJournalId(jid))
+            .build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException {
+    try {
+      rpcProxy.doUpgrade(NULL_CONTROLLER,
+          DoUpgradeRequestProto.newBuilder()
+            .setJid(convertJournalId(journalId))
+            .setSInfo(PBHelper.convert(sInfo))
+            .build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+  
+  @Override
+  public void doFinalize(String jid) throws IOException {
+    try {
+      rpcProxy.doFinalize(NULL_CONTROLLER,
+          DoFinalizeRequestProto.newBuilder()
+            .setJid(convertJournalId(jid))
+            .build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public Boolean canRollBack(String journalId, StorageInfo storage,
+      StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
+    try {
+      CanRollBackResponseProto response = rpcProxy.canRollBack(
+          NULL_CONTROLLER,
+          CanRollBackRequestProto.newBuilder()
+            .setJid(convertJournalId(journalId))
+            .setStorage(PBHelper.convert(storage))
+            .setPrevStorage(PBHelper.convert(prevStorage))
+            .setTargetLayoutVersion(targetLayoutVersion)
+            .build());
+      return response.getCanRollBack();
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public void doRollback(String journalId) throws IOException {
+    try {
+      rpcProxy.doRollback(NULL_CONTROLLER,
+          DoRollbackRequestProto.newBuilder()
+            .setJid(convertJournalId(journalId))
+            .build());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public Long getJournalCTime(String journalId) throws IOException {
+    try {
+      GetJournalCTimeResponseProto response = rpcProxy.getJournalCTime(
+          NULL_CONTROLLER,
+          GetJournalCTimeRequestProto.newBuilder()
+            .setJid(convertJournalId(journalId))
+            .build());
+      return response.getResultCTime();
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java?rev=1561516&r1=1561515&r2=1561516&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java Sun Jan 26 16:34:30 2014
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
 import org.apache.hadoop.hdfs.server.namenode.GetImageServlet;
@@ -139,20 +140,26 @@ public class GetJournalEditServlet exten
   private boolean checkStorageInfoOrSendError(JNStorage storage,
       HttpServletRequest request, HttpServletResponse response)
       throws IOException {
-    String myStorageInfoString = storage.toColonSeparatedString();
+    int myNsId = storage.getNamespaceID();
+    String myClusterId = storage.getClusterID();
+    
     String theirStorageInfoString = StringEscapeUtils.escapeHtml(
         request.getParameter(STORAGEINFO_PARAM));
 
-    if (theirStorageInfoString != null
-        && !myStorageInfoString.equals(theirStorageInfoString)) {
-      String msg = "This node has storage info '" + myStorageInfoString
-          + "' but the requesting node expected '"
-          + theirStorageInfoString + "'";
-      
-      response.sendError(HttpServletResponse.SC_FORBIDDEN, msg);
-      LOG.warn("Received an invalid request file transfer request from " +
-          request.getRemoteAddr() + ": " + msg);
-      return false;
+    if (theirStorageInfoString != null) {
+      int theirNsId = StorageInfo.getNsIdFromColonSeparatedString(
+          theirStorageInfoString);
+      String theirClusterId = StorageInfo.getClusterIdFromColonSeparatedString(
+          theirStorageInfoString);
+      if (myNsId != theirNsId || !myClusterId.equals(theirClusterId)) {
+        String msg = "This node has namespaceId '" + myNsId + " and clusterId '"
+            + myClusterId + "' but the requesting node expected '" + theirNsId
+            + "' and '" + theirClusterId + "'";
+        response.sendError(HttpServletResponse.SC_FORBIDDEN, msg);
+        LOG.warn("Received an invalid request file transfer request from " +
+            request.getRemoteAddr() + ": " + msg);
+        return false;
+      }
     }
     return true;
   }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java?rev=1561516&r1=1561515&r2=1561516&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java Sun Jan 26 16:34:30 2014
@@ -130,6 +130,10 @@ class JNStorage extends Storage {
     return new File(sd.getCurrentDir(), "paxos");
   }
   
+  File getRoot() {
+    return sd.getRoot();
+  }
+  
   /**
    * Remove any log files and associated paxos files which are older than
    * the given txid.
@@ -182,12 +186,15 @@ class JNStorage extends Storage {
     unlockAll();
     sd.clearDirectory();
     writeProperties(sd);
+    createPaxosDir();
+    analyzeStorage();
+  }
+  
+  void createPaxosDir() throws IOException {
     if (!getPaxosDir().mkdirs()) {
       throw new IOException("Could not create paxos dir: " + getPaxosDir());
     }
-    analyzeStorage();
   }
-
   
   void analyzeStorage() throws IOException {
     this.state = sd.analyzeStorage(StartupOption.REGULAR, this);

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java?rev=1561516&r1=1561515&r2=1561516&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java Sun Jan 26 16:34:30 2014
@@ -37,12 +37,14 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
 import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
 import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
@@ -73,7 +75,7 @@ import com.google.protobuf.TextFormat;
  * Each such journal is entirely independent despite being hosted by
  * the same JVM.
  */
-class Journal implements Closeable {
+public class Journal implements Closeable {
   static final Log LOG = LogFactory.getLog(Journal.class);
 
 
@@ -122,8 +124,8 @@ class Journal implements Closeable {
    */
   private BestEffortLongFile committedTxnId;
   
-  private static final String LAST_PROMISED_FILENAME = "last-promised-epoch";
-  private static final String LAST_WRITER_EPOCH = "last-writer-epoch";
+  public static final String LAST_PROMISED_FILENAME = "last-promised-epoch";
+  public static final String LAST_WRITER_EPOCH = "last-writer-epoch";
   private static final String COMMITTED_TXID_FILENAME = "committed-txid";
   
   private final FileJournalManager fjm;
@@ -627,7 +629,7 @@ class Journal implements Closeable {
   }
 
   /**
-   * @see QJournalProtocol#getEditLogManifest(String, long)
+   * @see QJournalProtocol#getEditLogManifest(String, long, boolean)
    */
   public RemoteEditLogManifest getEditLogManifest(long sinceTxId,
       boolean inProgressOk) throws IOException {
@@ -728,7 +730,7 @@ class Journal implements Closeable {
   }
   
   /**
-   * @see QJournalProtocol#acceptRecovery(RequestInfo, SegmentStateProto, URL)
+   * @see QJournalProtocol#acceptRecovery(RequestInfo, QJournalProtocolProtos.SegmentStateProto, URL)
    */
   public synchronized void acceptRecovery(RequestInfo reqInfo,
       SegmentStateProto segment, URL fromUrl)
@@ -980,4 +982,62 @@ class Journal implements Closeable {
       }
     }
   }
+
+  public synchronized void doPreUpgrade() throws IOException {
+    storage.getJournalManager().doPreUpgrade();
+  }
+
+  public synchronized void doUpgrade(StorageInfo sInfo) throws IOException {
+    long oldCTime = storage.getCTime();
+    storage.cTime = sInfo.cTime;
+    int oldLV = storage.getLayoutVersion();
+    storage.layoutVersion = sInfo.layoutVersion;
+    LOG.info("Starting upgrade of edits directory: "
+        + ".\n   old LV = " + oldLV
+        + "; old CTime = " + oldCTime
+        + ".\n   new LV = " + storage.getLayoutVersion()
+        + "; new CTime = " + storage.getCTime());
+    storage.getJournalManager().doUpgrade(storage);
+    storage.createPaxosDir();
+    
+    // Copy over the contents of the epoch data files to the new dir.
+    File currentDir = storage.getSingularStorageDir().getCurrentDir();
+    File previousDir = storage.getSingularStorageDir().getPreviousDir();
+    
+    PersistentLongFile prevLastPromisedEpoch = new PersistentLongFile(
+        new File(previousDir, LAST_PROMISED_FILENAME), 0);
+    PersistentLongFile prevLastWriterEpoch = new PersistentLongFile(
+        new File(previousDir, LAST_WRITER_EPOCH), 0);
+    
+    lastPromisedEpoch = new PersistentLongFile(
+        new File(currentDir, LAST_PROMISED_FILENAME), 0);
+    lastWriterEpoch = new PersistentLongFile(
+        new File(currentDir, LAST_WRITER_EPOCH), 0);
+    
+    lastPromisedEpoch.set(prevLastPromisedEpoch.get());
+    lastWriterEpoch.set(prevLastWriterEpoch.get());
+  }
+
+  public synchronized void doFinalize() throws IOException {
+    LOG.info("Finalizing upgrade for journal " 
+          + storage.getRoot() + "."
+          + (storage.getLayoutVersion()==0 ? "" :
+            "\n   cur LV = " + storage.getLayoutVersion()
+            + "; cur CTime = " + storage.getCTime()));
+    storage.getJournalManager().doFinalize();
+  }
+
+  public Boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
+      int targetLayoutVersion) throws IOException {
+    return this.storage.getJournalManager().canRollBack(storage, prevStorage,
+        targetLayoutVersion);
+  }
+
+  public void doRollback() throws IOException {
+    storage.getJournalManager().doRollback();
+  }
+
+  public Long getJournalCTime() throws IOException {
+    return storage.getJournalManager().getJournalCTime();
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java?rev=1561516&r1=1561515&r2=1561516&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java Sun Jan 26 16:34:30 2014
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
 import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
@@ -285,4 +286,31 @@ public class JournalNode implements Tool
     StringUtils.startupShutdownMessage(JournalNode.class, args, LOG);
     System.exit(ToolRunner.run(new JournalNode(), args));
   }
+
+  public void doPreUpgrade(String journalId) throws IOException {
+    getOrCreateJournal(journalId).doPreUpgrade();
+  }
+
+  public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException {
+    getOrCreateJournal(journalId).doUpgrade(sInfo);
+  }
+
+  public void doFinalize(String journalId) throws IOException {
+    getOrCreateJournal(journalId).doFinalize();
+  }
+
+  public Boolean canRollBack(String journalId, StorageInfo storage,
+      StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
+    return getOrCreateJournal(journalId).canRollBack(storage, prevStorage,
+        targetLayoutVersion);
+  }
+
+  public void doRollback(String journalId) throws IOException {
+    getOrCreateJournal(journalId).doRollback();
+  }
+
+  public Long getJournalCTime(String journalId) throws IOException {
+    return getOrCreateJournal(journalId).getJournalCTime();
+  }
+
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java?rev=1561516&r1=1561515&r2=1561516&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java Sun Jan 26 16:34:30 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.qjournal.p
 import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
 import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
 import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -205,4 +206,35 @@ class JournalNodeRpcServer implements QJ
         .acceptRecovery(reqInfo, log, fromUrl);
   }
 
+  @Override
+  public void doPreUpgrade(String journalId) throws IOException {
+    jn.doPreUpgrade(journalId);
+  }
+
+  @Override
+  public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException {
+    jn.doUpgrade(journalId, sInfo);
+  }
+
+  @Override
+  public void doFinalize(String journalId) throws IOException {
+    jn.doFinalize(journalId);
+  }
+
+  @Override
+  public Boolean canRollBack(String journalId, StorageInfo storage,
+      StorageInfo prevStorage, int targetLayoutVersion)
+      throws IOException {
+    return jn.canRollBack(journalId, storage, prevStorage, targetLayoutVersion);
+  }
+
+  @Override
+  public void doRollback(String journalId) throws IOException {
+    jn.doRollback(journalId);
+  }
+
+  @Override
+  public Long getJournalCTime(String journalId) throws IOException {
+    return jn.getJournalCTime(journalId);
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1561516&r1=1561515&r2=1561516&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Sun Jan 26 16:34:30 2014
@@ -337,6 +337,7 @@ public class Balancer {
         sock.connect(
             NetUtils.createSocketAddr(target.datanode.getXferAddr()),
             HdfsServerConstants.READ_TIMEOUT);
+        sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
         sock.setKeepAlive(true);
         
         OutputStream unbufOut = sock.getOutputStream();

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1561516&r1=1561515&r2=1561516&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Sun Jan 26 16:34:30 2014
@@ -79,7 +79,7 @@ public class BlockPlacementPolicyDefault
    */
   protected int tolerateHeartbeatMultiplier;
 
-  BlockPlacementPolicyDefault(Configuration conf,  FSClusterStats stats,
+  protected BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats,
                            NetworkTopology clusterMap) {
     initialize(conf, stats, clusterMap);
   }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1561516&r1=1561515&r2=1561516&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Sun Jan 26 16:34:30 2014
@@ -46,12 +46,12 @@ import org.apache.hadoop.net.NodeBase;
  */
 public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault {
 
-  BlockPlacementPolicyWithNodeGroup(Configuration conf,  FSClusterStats stats,
+  protected BlockPlacementPolicyWithNodeGroup(Configuration conf,  FSClusterStats stats,
       NetworkTopology clusterMap) {
     initialize(conf, stats, clusterMap);
   }
 
-  BlockPlacementPolicyWithNodeGroup() {
+  protected BlockPlacementPolicyWithNodeGroup() {
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1561516&r1=1561515&r2=1561516&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java Sun Jan 26 16:34:30 2014
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.server.common;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
@@ -26,26 +25,23 @@ import java.lang.management.ManagementFa
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.util.VersionInfo;
 
-import com.google.common.base.Preconditions;
-
 import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
 
 
 
@@ -82,7 +78,6 @@ public abstract class Storage extends St
   public static final int[] LAYOUT_VERSIONS_203 = {-19, -31};
 
   public    static final String STORAGE_FILE_LOCK     = "in_use.lock";
-  protected static final String STORAGE_FILE_VERSION  = "VERSION";
   public    static final String STORAGE_DIR_CURRENT   = "current";
   public    static final String STORAGE_DIR_PREVIOUS  = "previous";
   public    static final String STORAGE_TMP_REMOVED   = "removed.tmp";
@@ -126,22 +121,24 @@ public abstract class Storage extends St
   
   private class DirIterator implements Iterator<StorageDirectory> {
     StorageDirType dirType;
+    boolean includeShared;
     int prevIndex; // for remove()
     int nextIndex; // for next()
     
-    DirIterator(StorageDirType dirType) {
+    DirIterator(StorageDirType dirType, boolean includeShared) {
       this.dirType = dirType;
       this.nextIndex = 0;
       this.prevIndex = 0;
+      this.includeShared = includeShared;
     }
     
     @Override
     public boolean hasNext() {
       if (storageDirs.isEmpty() || nextIndex >= storageDirs.size())
         return false;
-      if (dirType != null) {
+      if (dirType != null || !includeShared) {
         while (nextIndex < storageDirs.size()) {
-          if (getStorageDir(nextIndex).getStorageDirType().isOfType(dirType))
+          if (shouldReturnNextDir())
             break;
           nextIndex++;
         }
@@ -156,9 +153,9 @@ public abstract class Storage extends St
       StorageDirectory sd = getStorageDir(nextIndex);
       prevIndex = nextIndex;
       nextIndex++;
-      if (dirType != null) {
+      if (dirType != null || !includeShared) {
         while (nextIndex < storageDirs.size()) {
-          if (getStorageDir(nextIndex).getStorageDirType().isOfType(dirType))
+          if (shouldReturnNextDir())
             break;
           nextIndex++;
         }
@@ -172,6 +169,12 @@ public abstract class Storage extends St
       storageDirs.remove(prevIndex); // remove last returned element
       hasNext(); // reset nextIndex to correct place
     }
+    
+    private boolean shouldReturnNextDir() {
+      StorageDirectory sd = getStorageDir(nextIndex);
+      return (dirType == null || sd.getStorageDirType().isOfType(dirType)) &&
+          (includeShared || !sd.isShared());
+    }
   }
   
   /**
@@ -203,7 +206,27 @@ public abstract class Storage extends St
    * them via the Iterator
    */
   public Iterator<StorageDirectory> dirIterator(StorageDirType dirType) {
-    return new DirIterator(dirType);
+    return dirIterator(dirType, true);
+  }
+  
+  /**
+   * Return all entries in storageDirs, potentially excluding shared dirs.
+   * @param includeShared whether or not to include shared dirs.
+   * @return an iterator over the configured storage dirs.
+   */
+  public Iterator<StorageDirectory> dirIterator(boolean includeShared) {
+    return dirIterator(null, includeShared);
+  }
+  
+  /**
+   * @param dirType all entries will be of this type of dir
+   * @param includeShared true to include any shared directories,
+   *        false otherwise
+   * @return an iterator over the configured storage dirs.
+   */
+  public Iterator<StorageDirectory> dirIterator(StorageDirType dirType,
+      boolean includeShared) {
+    return new DirIterator(dirType, includeShared);
   }
   
   public Iterable<StorageDirectory> dirIterable(final StorageDirType dirType) {
@@ -233,7 +256,9 @@ public abstract class Storage extends St
   @InterfaceAudience.Private
   public static class StorageDirectory implements FormatConfirmable {
     final File root;              // root directory
-    final boolean useLock;        // flag to enable storage lock
+    // whether or not this dir is shared between two separate NNs for HA, or
+    // between multiple block pools in the case of federation.
+    final boolean isShared;
     final StorageDirType dirType; // storage dir type
     FileLock lock;                // storage lock
 
@@ -241,11 +266,11 @@ public abstract class Storage extends St
     
     public StorageDirectory(File dir) {
       // default dirType is null
-      this(dir, null, true);
+      this(dir, null, false);
     }
     
     public StorageDirectory(File dir, StorageDirType dirType) {
-      this(dir, dirType, true);
+      this(dir, dirType, false);
     }
     
     public void setStorageUuid(String storageUuid) {
@@ -260,14 +285,14 @@ public abstract class Storage extends St
      * Constructor
      * @param dir directory corresponding to the storage
      * @param dirType storage directory type
-     * @param useLock true - enables locking on the storage directory and false
-     *          disables locking
+     * @param isShared whether or not this dir is shared between two NNs. true
+     *          disables locking on the storage directory, false enables locking
      */
-    public StorageDirectory(File dir, StorageDirType dirType, boolean useLock) {
+    public StorageDirectory(File dir, StorageDirType dirType, boolean isShared) {
       this.root = dir;
       this.lock = null;
       this.dirType = dirType;
-      this.useLock = useLock;
+      this.isShared = isShared;
     }
     
     /**
@@ -621,6 +646,10 @@ public abstract class Storage extends St
       
       return true;
     }
+    
+    public boolean isShared() {
+      return isShared;
+    }
 
 
     /**
@@ -635,7 +664,7 @@ public abstract class Storage extends St
      * @throws IOException if locking fails
      */
     public void lock() throws IOException {
-      if (!useLock) {
+      if (isShared()) {
         LOG.info("Locking is disabled");
         return;
       }
@@ -890,22 +919,6 @@ public abstract class Storage extends St
   }
   
   /**
-   * Get common storage fields.
-   * Should be overloaded if additional fields need to be get.
-   * 
-   * @param props
-   * @throws IOException
-   */
-  protected void setFieldsFromProperties(
-      Properties props, StorageDirectory sd) throws IOException {
-    setLayoutVersion(props, sd);
-    setNamespaceID(props, sd);
-    setStorageType(props, sd);
-    setcTime(props, sd);
-    setClusterId(props, layoutVersion, sd);
-  }
-  
-  /**
    * Set common storage fields into the given properties object.
    * Should be overloaded if additional fields need to be set.
    * 
@@ -923,22 +936,29 @@ public abstract class Storage extends St
     }
     props.setProperty("cTime", String.valueOf(cTime));
   }
-
+  
   /**
-   * Read properties from the VERSION file in the given storage directory.
+   * Get common storage fields.
+   * Should be overloaded if additional fields need to be get.
+   * 
+   * @param props
+   * @throws IOException
    */
-  public void readProperties(StorageDirectory sd) throws IOException {
-    Properties props = readPropertiesFile(sd.getVersionFile());
-    setFieldsFromProperties(props, sd);
+  protected void setFieldsFromProperties(
+      Properties props, StorageDirectory sd) throws IOException {
+    super.setFieldsFromProperties(props, sd);
+    setStorageType(props, sd);
   }
-
-  /**
-   * Read properties from the the previous/VERSION file in the given storage directory.
-   */
-  public void readPreviousVersionProperties(StorageDirectory sd)
-      throws IOException {
-    Properties props = readPropertiesFile(sd.getPreviousVersionFile());
-    setFieldsFromProperties(props, sd);
+  
+  /** Validate and set storage type from {@link Properties}*/
+  protected void setStorageType(Properties props, StorageDirectory sd)
+      throws InconsistentFSStateException {
+    NodeType type = NodeType.valueOf(getProperty(props, sd, "storageType"));
+    if (!storageType.equals(type)) {
+      throw new InconsistentFSStateException(sd.root,
+          "node type is incompatible with others.");
+    }
+    storageType = type;
   }
 
   /**
@@ -947,10 +967,15 @@ public abstract class Storage extends St
   public void writeProperties(StorageDirectory sd) throws IOException {
     writeProperties(sd.getVersionFile(), sd);
   }
-
+  
   public void writeProperties(File to, StorageDirectory sd) throws IOException {
     Properties props = new Properties();
     setPropertiesFromFields(props, sd);
+    writeProperties(to, sd, props);
+  }
+
+  public static void writeProperties(File to, StorageDirectory sd,
+      Properties props) throws IOException {
     RandomAccessFile file = new RandomAccessFile(to, "rws");
     FileOutputStream out = null;
     try {
@@ -977,23 +1002,6 @@ public abstract class Storage extends St
       file.close();
     }
   }
-  
-  public static Properties readPropertiesFile(File from) throws IOException {
-    RandomAccessFile file = new RandomAccessFile(from, "rws");
-    FileInputStream in = null;
-    Properties props = new Properties();
-    try {
-      in = new FileInputStream(file.getFD());
-      file.seek(0);
-      props.load(in);
-    } finally {
-      if (in != null) {
-        in.close();
-      }
-      file.close();
-    }
-    return props;
-  }
 
   public static void rename(File from, File to) throws IOException {
     if (!from.renameTo(to))
@@ -1044,69 +1052,6 @@ public abstract class Storage extends St
       + "-" + Long.toString(storage.getCTime());
   }
   
-  String getProperty(Properties props, StorageDirectory sd,
-      String name) throws InconsistentFSStateException {
-    String property = props.getProperty(name);
-    if (property == null) {
-      throw new InconsistentFSStateException(sd.root, "file "
-          + STORAGE_FILE_VERSION + " has " + name + " missing.");
-    }
-    return property;
-  }
-  
-  /** Validate and set storage type from {@link Properties}*/
-  protected void setStorageType(Properties props, StorageDirectory sd)
-      throws InconsistentFSStateException {
-    NodeType type = NodeType.valueOf(getProperty(props, sd, "storageType"));
-    if (!storageType.equals(type)) {
-      throw new InconsistentFSStateException(sd.root,
-          "node type is incompatible with others.");
-    }
-    storageType = type;
-  }
-  
-  /** Validate and set ctime from {@link Properties}*/
-  protected void setcTime(Properties props, StorageDirectory sd)
-      throws InconsistentFSStateException {
-    cTime = Long.parseLong(getProperty(props, sd, "cTime"));
-  }
-
-  /** Validate and set clusterId from {@link Properties}*/
-  protected void setClusterId(Properties props, int layoutVersion,
-      StorageDirectory sd) throws InconsistentFSStateException {
-    // Set cluster ID in version that supports federation
-    if (LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
-      String cid = getProperty(props, sd, "clusterID");
-      if (!(clusterID.equals("") || cid.equals("") || clusterID.equals(cid))) {
-        throw new InconsistentFSStateException(sd.getRoot(),
-            "cluster Id is incompatible with others.");
-      }
-      clusterID = cid;
-    }
-  }
-  
-  /** Validate and set layout version from {@link Properties}*/
-  protected void setLayoutVersion(Properties props, StorageDirectory sd)
-      throws IncorrectVersionException, InconsistentFSStateException {
-    int lv = Integer.parseInt(getProperty(props, sd, "layoutVersion"));
-    if (lv < HdfsConstants.LAYOUT_VERSION) { // future version
-      throw new IncorrectVersionException(lv, "storage directory "
-          + sd.root.getAbsolutePath());
-    }
-    layoutVersion = lv;
-  }
-  
-  /** Validate and set namespaceID version from {@link Properties}*/
-  protected void setNamespaceID(Properties props, StorageDirectory sd)
-      throws InconsistentFSStateException {
-    int nsId = Integer.parseInt(getProperty(props, sd, "namespaceID"));
-    if (namespaceID != 0 && nsId != 0 && namespaceID != nsId) {
-      throw new InconsistentFSStateException(sd.root,
-          "namespaceID is incompatible with others.");
-    }
-    namespaceID = nsId;
-  }
-  
   public static boolean is203LayoutVersion(int layoutVersion) {
     for (int lv203 : LAYOUT_VERSIONS_203) {
       if (lv203 == layoutVersion) {

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java?rev=1561516&r1=1561515&r2=1561516&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java Sun Jan 26 16:34:30 2014
@@ -17,9 +17,17 @@
  */
 package org.apache.hadoop.hdfs.server.common;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Properties;
+
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 
 import com.google.common.base.Joiner;
 
@@ -34,6 +42,8 @@ public class StorageInfo {
   public int   namespaceID;     // id of the file system
   public String clusterID;      // id of the cluster
   public long  cTime;           // creation time of the file system state
+  
+  protected static final String STORAGE_FILE_VERSION    = "VERSION";
  
   public StorageInfo () {
     this(0, 0, "", 0L);
@@ -96,4 +106,113 @@ public class StorageInfo {
     return Joiner.on(":").join(
         layoutVersion, namespaceID, cTime, clusterID);
   }
+  
+  public static int getNsIdFromColonSeparatedString(String in) {
+    return Integer.parseInt(in.split(":")[1]);
+  }
+  
+  public static String getClusterIdFromColonSeparatedString(String in) {
+    return in.split(":")[3];
+  }
+  
+  /**
+   * Read properties from the VERSION file in the given storage directory.
+   */
+  public void readProperties(StorageDirectory sd) throws IOException {
+    Properties props = readPropertiesFile(sd.getVersionFile());
+    setFieldsFromProperties(props, sd);
+  }
+  
+  /**
+   * Read properties from the the previous/VERSION file in the given storage directory.
+   */
+  public void readPreviousVersionProperties(StorageDirectory sd)
+      throws IOException {
+    Properties props = readPropertiesFile(sd.getPreviousVersionFile());
+    setFieldsFromProperties(props, sd);
+  }
+  
+  /**
+   * Get common storage fields.
+   * Should be overloaded if additional fields need to be get.
+   * 
+   * @param props
+   * @throws IOException
+   */
+  protected void setFieldsFromProperties(
+      Properties props, StorageDirectory sd) throws IOException {
+    setLayoutVersion(props, sd);
+    setNamespaceID(props, sd);
+    setcTime(props, sd);
+    setClusterId(props, layoutVersion, sd);
+  }
+  
+  /** Validate and set ctime from {@link Properties}*/
+  protected void setcTime(Properties props, StorageDirectory sd)
+      throws InconsistentFSStateException {
+    cTime = Long.parseLong(getProperty(props, sd, "cTime"));
+  }
+
+  /** Validate and set clusterId from {@link Properties}*/
+  protected void setClusterId(Properties props, int layoutVersion,
+      StorageDirectory sd) throws InconsistentFSStateException {
+    // Set cluster ID in version that supports federation
+    if (LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
+      String cid = getProperty(props, sd, "clusterID");
+      if (!(clusterID.equals("") || cid.equals("") || clusterID.equals(cid))) {
+        throw new InconsistentFSStateException(sd.getRoot(),
+            "cluster Id is incompatible with others.");
+      }
+      clusterID = cid;
+    }
+  }
+  
+  /** Validate and set layout version from {@link Properties}*/
+  protected void setLayoutVersion(Properties props, StorageDirectory sd)
+      throws IncorrectVersionException, InconsistentFSStateException {
+    int lv = Integer.parseInt(getProperty(props, sd, "layoutVersion"));
+    if (lv < HdfsConstants.LAYOUT_VERSION) { // future version
+      throw new IncorrectVersionException(lv, "storage directory "
+          + sd.root.getAbsolutePath());
+    }
+    layoutVersion = lv;
+  }
+  
+  /** Validate and set namespaceID version from {@link Properties}*/
+  protected void setNamespaceID(Properties props, StorageDirectory sd)
+      throws InconsistentFSStateException {
+    int nsId = Integer.parseInt(getProperty(props, sd, "namespaceID"));
+    if (namespaceID != 0 && nsId != 0 && namespaceID != nsId) {
+      throw new InconsistentFSStateException(sd.root,
+          "namespaceID is incompatible with others.");
+    }
+    namespaceID = nsId;
+  }
+  
+  static String getProperty(Properties props, StorageDirectory sd,
+      String name) throws InconsistentFSStateException {
+    String property = props.getProperty(name);
+    if (property == null) {
+      throw new InconsistentFSStateException(sd.root, "file "
+          + STORAGE_FILE_VERSION + " has " + name + " missing.");
+    }
+    return property;
+  }
+  
+  public static Properties readPropertiesFile(File from) throws IOException {
+    RandomAccessFile file = new RandomAccessFile(from, "rws");
+    FileInputStream in = null;
+    Properties props = new Properties();
+    try {
+      in = new FileInputStream(file.getFD());
+      file.seek(0);
+      props.load(in);
+    } finally {
+      if (in != null) {
+        in.close();
+      }
+      file.close();
+    }
+    return props;
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java?rev=1561516&r1=1561515&r2=1561516&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java Sun Jan 26 16:34:30 2014
@@ -103,7 +103,7 @@ public class BlockPoolSliceStorage exten
         dataDirs.size());
     for (Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
       File dataDir = it.next();
-      StorageDirectory sd = new StorageDirectory(dataDir, null, false);
+      StorageDirectory sd = new StorageDirectory(dataDir, null, true);
       StorageState curState;
       try {
         curState = sd.analyzeStorage(startOpt, this);

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java?rev=1561516&r1=1561515&r2=1561516&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java Sun Jan 26 16:34:30 2014
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.RandomAccessFile;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DU;
@@ -191,7 +192,7 @@ class BlockPoolSlice {
             blockFile.length(), genStamp, volume, blockFile.getParentFile());
       } else {
         newReplica = new ReplicaWaitingToBeRecovered(blockId,
-            validateIntegrity(blockFile, genStamp), 
+            validateIntegrityAndSetLength(blockFile, genStamp), 
             genStamp, volume, blockFile.getParentFile());
       }
 
@@ -214,7 +215,7 @@ class BlockPoolSlice {
    * @param genStamp generation stamp of the block
    * @return the number of valid bytes
    */
-  private long validateIntegrity(File blockFile, long genStamp) {
+  private long validateIntegrityAndSetLength(File blockFile, long genStamp) {
     DataInputStream checksumIn = null;
     InputStream blockIn = null;
     try {
@@ -257,11 +258,25 @@ class BlockPoolSlice {
       IOUtils.readFully(blockIn, buf, 0, lastChunkSize);
 
       checksum.update(buf, 0, lastChunkSize);
+      long validFileLength;
       if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc
-        return lastChunkStartPos + lastChunkSize;
+        validFileLength = lastChunkStartPos + lastChunkSize;
       } else { // last chunck is corrupt
-        return lastChunkStartPos;
+        validFileLength = lastChunkStartPos;
       }
+
+      // truncate if extra bytes are present without CRC
+      if (blockFile.length() > validFileLength) {
+        RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
+        try {
+          // truncate blockFile
+          blockRAF.setLength(validFileLength);
+        } finally {
+          blockRAF.close();
+        }
+      }
+
+      return validFileLength;
     } catch (IOException e) {
       FsDatasetImpl.LOG.warn(e);
       return 0;

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java?rev=1561516&r1=1561515&r2=1561516&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java Sun Jan 26 16:34:30 2014
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.na
 import java.io.IOException;
 import java.util.Collection;
 
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -97,4 +99,35 @@ class BackupJournalManager implements Jo
   public String toString() {
     return "BackupJournalManager";
   }
+  
+  @Override
+  public void doPreUpgrade() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void doUpgrade(Storage storage) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+  
+  @Override
+  public void doFinalize() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
+      int targetLayoutVersion) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void doRollback() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getJournalCTime() throws IOException {
+    throw new UnsupportedOperationException();
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1561516&r1=1561515&r2=1561516&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Sun Jan 26 16:34:30 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
 import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
@@ -367,7 +368,7 @@ public class BackupNode extends NameNode
     } else {
       nsInfo.validateStorage(storage);
     }
-    bnImage.initEditLog();
+    bnImage.initEditLog(StartupOption.REGULAR);
     setRegistration();
     NamenodeRegistration nnReg = null;
     while(!isStopRequested()) {
@@ -423,7 +424,8 @@ public class BackupNode extends NameNode
     return DFSUtil.getBackupNameServiceId(conf);
   }
 
-  protected HAState createHAState() {
+  @Override
+  protected HAState createHAState(StartupOption startOpt) {
     return new BackupState();
   }
 

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1561516&r1=1561515&r2=1561516&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Sun Jan 26 16:34:30 2014
@@ -174,7 +174,6 @@ public class FSDirectory implements Clos
         DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
     this.lsLimit = configuredLimit>0 ?
         configuredLimit : DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT;
-
     this.contentCountLimit = conf.getInt(
         DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY,
         DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_DEFAULT);
@@ -1490,6 +1489,11 @@ public class FSDirectory implements Clos
   /**
    * Get a partial listing of the indicated directory
    *
+   * We will stop when any of the following conditions is met:
+   * 1) this.lsLimit files have been added
+   * 2) needLocation is true AND enough files have been added such
+   * that at least this.lsLimit block locations are in the response
+   *
    * @param src the directory name
    * @param startAfter the name to start listing after
    * @param needLocation if block locations are returned
@@ -1521,14 +1525,30 @@ public class FSDirectory implements Clos
       int startChild = INodeDirectory.nextChild(contents, startAfter);
       int totalNumChildren = contents.size();
       int numOfListing = Math.min(totalNumChildren-startChild, this.lsLimit);
+      int locationBudget = this.lsLimit;
+      int listingCnt = 0;
       HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
-      for (int i=0; i<numOfListing; i++) {
+      for (int i=0; i<numOfListing && locationBudget>0; i++) {
         INode cur = contents.get(startChild+i);
         listing[i] = createFileStatus(cur.getLocalNameBytes(), cur,
             needLocation, snapshot);
+        listingCnt++;
+        if (needLocation) {
+            // Once we  hit lsLimit locations, stop.
+            // This helps to prevent excessively large response payloads.
+            // Approximate #locations with locatedBlockCount() * repl_factor
+            LocatedBlocks blks = 
+                ((HdfsLocatedFileStatus)listing[i]).getBlockLocations();
+            locationBudget -= (blks == null) ? 0 :
+               blks.locatedBlockCount() * listing[i].getReplication();
+        }
+      }
+      // truncate return array if necessary
+      if (listingCnt < numOfListing) {
+          listing = Arrays.copyOf(listing, listingCnt);
       }
       return new DirectoryListing(
-          listing, totalNumChildren-startChild-numOfListing);
+          listing, totalNumChildren-startChild-listingCnt);
     } finally {
       readUnlock();
     }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1561516&r1=1561515&r2=1561516&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Sun Jan 26 16:34:30 2014
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddBlockOp;
@@ -253,10 +254,12 @@ public class FSEditLog implements LogsPu
       if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
         StorageDirectory sd = storage.getStorageDirectory(u);
         if (sd != null) {
-          journalSet.add(new FileJournalManager(conf, sd, storage), required);
+          journalSet.add(new FileJournalManager(conf, sd, storage),
+              required, sharedEditsDirs.contains(u));
         }
       } else {
-        journalSet.add(createJournal(u), required);
+        journalSet.add(createJournal(u), required,
+            sharedEditsDirs.contains(u));
       }
     }
  
@@ -1346,6 +1349,58 @@ public class FSEditLog implements LogsPu
     }
   }
   
+  public long getSharedLogCTime() throws IOException {
+    for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
+      if (jas.isShared()) {
+        return jas.getManager().getJournalCTime();
+      }
+    }
+    throw new IOException("No shared log found.");
+  }
+  
+  public synchronized void doPreUpgradeOfSharedLog() throws IOException {
+    for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
+      if (jas.isShared()) {
+        jas.getManager().doPreUpgrade();
+      }
+    }
+  }
+  
+  public synchronized void doUpgradeOfSharedLog() throws IOException {
+    for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
+      if (jas.isShared()) {
+        jas.getManager().doUpgrade(storage);
+      }
+    }
+  }
+  
+  public synchronized void doFinalizeOfSharedLog() throws IOException {
+    for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
+      if (jas.isShared()) {
+        jas.getManager().doFinalize();
+      }
+    }
+  }
+  
+  public synchronized boolean canRollBackSharedLog(Storage prevStorage,
+      int targetLayoutVersion) throws IOException {
+    for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
+      if (jas.isShared()) {
+        return jas.getManager().canRollBack(storage, prevStorage,
+            targetLayoutVersion);
+      }
+    }
+    throw new IOException("No shared log found.");
+  }
+  
+  public synchronized void doRollback() throws IOException {
+    for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
+      if (jas.isShared()) {
+        jas.getManager().doRollback();
+      }
+    }
+  }
+  
   @Override
   public void selectInputStreams(Collection<EditLogInputStream> streams,
       long fromTxId, boolean inProgressOk) throws IOException {
@@ -1477,4 +1532,5 @@ public class FSEditLog implements LogsPu
                                          + uri, e);
     }
   }
+
 }



Mime
View raw message