hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject svn commit: r1550774 [2/10] - in /hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs: ./ src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apach...
Date Fri, 13 Dec 2013 17:28:18 GMT
Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Fri Dec 13 17:28:14 2013
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
@@ -57,12 +58,12 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
-import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclStatusProto;
@@ -133,6 +134,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryListingProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -264,17 +267,20 @@ public class PBHelper {
 
   // DatanodeId
   public static DatanodeID convert(DatanodeIDProto dn) {
-    return new DatanodeID(dn.getIpAddr(), dn.getHostName(), dn.getStorageID(),
+    return new DatanodeID(dn.getIpAddr(), dn.getHostName(), dn.getDatanodeUuid(),
         dn.getXferPort(), dn.getInfoPort(), dn.hasInfoSecurePort() ? dn
         .getInfoSecurePort() : 0, dn.getIpcPort());
   }
 
   public static DatanodeIDProto convert(DatanodeID dn) {
+    // For wire compatibility with older versions we transmit the StorageID
+    // which is the same as the DatanodeUuid. Since StorageID is a required
+    // field we pass the empty string if the DatanodeUuid is not yet known.
     return DatanodeIDProto.newBuilder()
         .setIpAddr(dn.getIpAddr())
         .setHostName(dn.getHostName())
-        .setStorageID(dn.getStorageID())
         .setXferPort(dn.getXferPort())
+        .setDatanodeUuid(dn.getDatanodeUuid() != null ? dn.getDatanodeUuid() : "")
         .setInfoPort(dn.getInfoPort())
         .setInfoSecurePort(dn.getInfoSecurePort())
         .setIpcPort(dn.getIpcPort()).build();
@@ -316,12 +322,16 @@ public class PBHelper {
   public static BlockWithLocationsProto convert(BlockWithLocations blk) {
     return BlockWithLocationsProto.newBuilder()
         .setBlock(convert(blk.getBlock()))
-        .addAllStorageIDs(Arrays.asList(blk.getStorageIDs())).build();
+        .addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids()))
+        .addAllStorageUuids(Arrays.asList(blk.getStorageIDs())).build();
   }
 
   public static BlockWithLocations convert(BlockWithLocationsProto b) {
-    return new BlockWithLocations(convert(b.getBlock()), b.getStorageIDsList()
-        .toArray(new String[0]));
+    final List<String> datanodeUuids = b.getDatanodeUuidsList();
+    final List<String> storageUuids = b.getStorageUuidsList();
+    return new BlockWithLocations(convert(b.getBlock()),
+        datanodeUuids.toArray(new String[datanodeUuids.size()]),
+        storageUuids.toArray(new String[storageUuids.size()]));
   }
 
   public static BlocksWithLocationsProto convert(BlocksWithLocations blks) {
@@ -623,6 +633,17 @@ public class PBHelper {
         "Found additional cached replica locations that are not in the set of"
         + " storage-backed locations!");
 
+    StorageType[] storageTypes = b.getStorageTypes();
+    if (storageTypes != null) {
+      for (int i = 0; i < storageTypes.length; ++i) {
+        builder.addStorageTypes(PBHelper.convertStorageType(storageTypes[i]));
+      }
+    }
+    final String[] storageIDs = b.getStorageIDs();
+    if (storageIDs != null) {
+      builder.addAllStorageIDs(Arrays.asList(storageIDs));
+    }
+
     return builder.setB(PBHelper.convert(b.getBlock()))
         .setBlockToken(PBHelper.convert(b.getBlockToken()))
         .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
@@ -635,6 +656,25 @@ public class PBHelper {
     for (int i = 0; i < locs.size(); i++) {
       targets[i] = PBHelper.convert(locs.get(i));
     }
+
+    final int storageTypesCount = proto.getStorageTypesCount();
+    final StorageType[] storageTypes;
+    if (storageTypesCount == 0) {
+      storageTypes = null;
+    } else {
+      Preconditions.checkState(storageTypesCount == locs.size());
+      storageTypes = convertStorageTypeProtos(proto.getStorageTypesList());
+    }
+
+    final int storageIDsCount = proto.getStorageIDsCount();
+    final String[] storageIDs;
+    if (storageIDsCount == 0) {
+      storageIDs = null;
+    } else {
+      Preconditions.checkState(storageIDsCount == locs.size());
+      storageIDs = proto.getStorageIDsList().toArray(new String[storageIDsCount]);
+    }
+
     // Set values from the isCached list, re-using references from loc
     List<DatanodeInfo> cachedLocs = new ArrayList<DatanodeInfo>(locs.size());
     List<Boolean> isCachedList = proto.getIsCachedList();
@@ -645,7 +685,7 @@ public class PBHelper {
     }
 
     LocatedBlock lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets,
-        proto.getOffset(), proto.getCorrupt(),
+        storageIDs, storageTypes, proto.getOffset(), proto.getCorrupt(),
         cachedLocs.toArray(new DatanodeInfo[0]));
     lb.setBlockToken(PBHelper.convert(proto.getBlockToken()));
 
@@ -789,7 +829,8 @@ public class PBHelper {
     for (int i = 0; i < blocks.length; i++) {
       builder.addBlocks(PBHelper.convert(blocks[i]));
     }
-    builder.addAllTargets(PBHelper.convert(cmd.getTargets()));
+    builder.addAllTargets(convert(cmd.getTargets()))
+           .addAllTargetStorageUuids(convert(cmd.getTargetStorageIDs()));
     return builder.build();
   }
   
@@ -822,6 +863,15 @@ public class PBHelper {
     return Arrays.asList(ret);
   }
 
+  private static List<StorageUuidsProto> convert(String[][] targetStorageUuids) {
+    StorageUuidsProto[] ret = new StorageUuidsProto[targetStorageUuids.length];
+    for (int i = 0; i < targetStorageUuids.length; i++) {
+      ret[i] = StorageUuidsProto.newBuilder()
+          .addAllStorageUuids(Arrays.asList(targetStorageUuids[i])).build();
+    }
+    return Arrays.asList(ret);
+  }
+
   public static DatanodeCommandProto convert(DatanodeCommand datanodeCommand) {
     DatanodeCommandProto.Builder builder = DatanodeCommandProto.newBuilder();
     if (datanodeCommand == null) {
@@ -901,6 +951,14 @@ public class PBHelper {
     for (int i = 0; i < targetList.size(); i++) {
       targets[i] = PBHelper.convert(targetList.get(i));
     }
+
+    List<StorageUuidsProto> targetStorageUuidsList = blkCmd.getTargetStorageUuidsList();
+    String[][] targetStorageIDs = new String[targetStorageUuidsList.size()][];
+    for(int i = 0; i < targetStorageIDs.length; i++) {
+      List<String> storageIDs = targetStorageUuidsList.get(i).getStorageUuidsList();
+      targetStorageIDs[i] = storageIDs.toArray(new String[storageIDs.size()]);
+    }
+
     int action = DatanodeProtocol.DNA_UNKNOWN;
     switch (blkCmd.getAction()) {
     case TRANSFER:
@@ -915,7 +973,8 @@ public class PBHelper {
     default:
       throw new AssertionError("Unknown action type: " + blkCmd.getAction());
     }
-    return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets);
+    return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets,
+        targetStorageIDs);
   }
 
   public static BlockIdCommand convert(BlockIdCommandProto blkIdCmd) {
@@ -1445,11 +1504,12 @@ public class PBHelper {
 
   public static DatanodeStorageProto convert(DatanodeStorage s) {
     return DatanodeStorageProto.newBuilder()
-        .setState(PBHelper.convert(s.getState()))
-        .setStorageID(s.getStorageID()).build();
+        .setState(PBHelper.convertState(s.getState()))
+        .setStorageType(PBHelper.convertStorageType(s.getStorageType()))
+        .setStorageUuid(s.getStorageID()).build();
   }
 
-  private static StorageState convert(State state) {
+  private static StorageState convertState(State state) {
     switch(state) {
     case READ_ONLY:
       return StorageState.READ_ONLY;
@@ -1459,11 +1519,26 @@ public class PBHelper {
     }
   }
 
+  private static StorageTypeProto convertStorageType(
+      StorageType type) {
+    switch(type) {
+    case DISK:
+      return StorageTypeProto.DISK;
+    case SSD:
+      return StorageTypeProto.SSD;
+    default:
+      throw new IllegalStateException(
+          "BUG: StorageType not found, type=" + type);
+    }
+  }
+
   public static DatanodeStorage convert(DatanodeStorageProto s) {
-    return new DatanodeStorage(s.getStorageID(), PBHelper.convert(s.getState()));
+    return new DatanodeStorage(s.getStorageUuid(),
+                               PBHelper.convertState(s.getState()),
+                               PBHelper.convertType(s.getStorageType()));
   }
 
-  private static State convert(StorageState state) {
+  private static State convertState(StorageState state) {
     switch(state) {
     case READ_ONLY:
       return DatanodeStorage.State.READ_ONLY;
@@ -1473,14 +1548,50 @@ public class PBHelper {
     }
   }
 
+  private static StorageType convertType(StorageTypeProto type) {
+    switch(type) {
+      case DISK:
+        return StorageType.DISK;
+      case SSD:
+        return StorageType.SSD;
+      default:
+        throw new IllegalStateException(
+            "BUG: StorageTypeProto not found, type=" + type);
+    }
+  }
+
+  private static StorageType[] convertStorageTypeProtos(
+      List<StorageTypeProto> storageTypesList) {
+    final StorageType[] storageTypes = new StorageType[storageTypesList.size()];
+    for (int i = 0; i < storageTypes.length; ++i) {
+      storageTypes[i] = PBHelper.convertType(storageTypesList.get(i));
+    }
+    return storageTypes;
+  }
+
   public static StorageReportProto convert(StorageReport r) {
     StorageReportProto.Builder builder = StorageReportProto.newBuilder()
         .setBlockPoolUsed(r.getBlockPoolUsed()).setCapacity(r.getCapacity())
         .setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining())
-        .setStorageID(r.getStorageID());
+        .setStorageUuid(r.getStorageID());
     return builder.build();
   }
 
+  public static StorageReport convert(StorageReportProto p) {
+    return new StorageReport(p.getStorageUuid(), p.getFailed(),
+        p.getCapacity(), p.getDfsUsed(), p.getRemaining(),
+        p.getBlockPoolUsed());
+  }
+
+  public static StorageReport[] convertStorageReports(
+      List<StorageReportProto> list) {
+    final StorageReport[] report = new StorageReport[list.size()];
+    for (int i = 0; i < report.length; i++) {
+      report[i] = convert(list.get(i));
+    }
+    return report;
+  }
+
   public static JournalInfo convert(JournalInfoProto info) {
     int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
     int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;
@@ -1838,3 +1949,4 @@ public class PBHelper {
     return GetAclStatusResponseProto.newBuilder().setResult(r).build();
   }
 }
+

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java Fri Dec 13 17:28:14 2013
@@ -109,7 +109,7 @@ interface AsyncLogger {
    * Fetch the list of edit logs available on the remote node.
    */
   public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
-      long fromTxnId, boolean forReading, boolean inProgressOk);
+      long fromTxnId, boolean inProgressOk);
 
   /**
    * Prepare recovery. See the HDFS-3077 design document for details.

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java Fri Dec 13 17:28:14 2013
@@ -261,13 +261,13 @@ class AsyncLoggerSet {
   }
 
   public QuorumCall<AsyncLogger, RemoteEditLogManifest> getEditLogManifest(
-      long fromTxnId, boolean forReading, boolean inProgressOk) {
+      long fromTxnId, boolean inProgressOk) {
     Map<AsyncLogger,
         ListenableFuture<RemoteEditLogManifest>> calls
         = Maps.newHashMap();
     for (AsyncLogger logger : loggers) {
       ListenableFuture<RemoteEditLogManifest> future =
-          logger.getEditLogManifest(fromTxnId, forReading, inProgressOk);
+          logger.getEditLogManifest(fromTxnId, inProgressOk);
       calls.put(logger, future);
     }
     return QuorumCall.create(calls);

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java Fri Dec 13 17:28:14 2013
@@ -181,6 +181,7 @@ public class IPCLoggerChannel implements
   
   @Override
   public void close() {
+    QuorumJournalManager.LOG.info("Closing", new Exception());
     // No more tasks may be submitted after this point.
     executor.shutdown();
     if (proxy != null) {
@@ -520,13 +521,12 @@ public class IPCLoggerChannel implements
 
   @Override
   public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
-      final long fromTxnId, final boolean forReading, 
-      final boolean inProgressOk) {
+      final long fromTxnId, final boolean inProgressOk) {
     return executor.submit(new Callable<RemoteEditLogManifest>() {
       @Override
       public RemoteEditLogManifest call() throws IOException {
         GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
-            journalId, fromTxnId, forReading, inProgressOk);
+            journalId, fromTxnId, inProgressOk);
         // Update the http port, since we need this to build URLs to any of the
         // returned logs.
         constructHttpServerURI(ret);

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java Fri Dec 13 17:28:14 2013
@@ -449,18 +449,13 @@ public class QuorumJournalManager implem
   public void close() throws IOException {
     loggers.close();
   }
-  
-  public void selectInputStreams(Collection<EditLogInputStream> streams,
-      long fromTxnId, boolean inProgressOk) throws IOException {
-    selectInputStreams(streams, fromTxnId, inProgressOk, true);
-  }
 
   @Override
   public void selectInputStreams(Collection<EditLogInputStream> streams,
-      long fromTxnId, boolean inProgressOk, boolean forReading) throws IOException {
+      long fromTxnId, boolean inProgressOk) throws IOException {
 
     QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
-        loggers.getEditLogManifest(fromTxnId, forReading, inProgressOk);
+        loggers.getEditLogManifest(fromTxnId, inProgressOk);
     Map<AsyncLogger, RemoteEditLogManifest> resps =
         loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
             "selectInputStreams");

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java Fri Dec 13 17:28:14 2013
@@ -123,14 +123,12 @@ public interface QJournalProtocol {
   /**
    * @param jid the journal from which to enumerate edits
    * @param sinceTxId the first transaction which the client cares about
-   * @param forReading whether or not the caller intends to read from the edit
-   *        logs
    * @param inProgressOk whether or not to check the in-progress edit log 
    *        segment       
    * @return a list of edit log segments since the given transaction ID.
    */
   public GetEditLogManifestResponseProto getEditLogManifest(String jid,
-      long sinceTxId, boolean forReading, boolean inProgressOk)
+      long sinceTxId, boolean inProgressOk)
       throws IOException;
   
   /**

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java Fri Dec 13 17:28:14 2013
@@ -203,7 +203,6 @@ public class QJournalProtocolServerSideT
       return impl.getEditLogManifest(
           request.getJid().getIdentifier(),
           request.getSinceTxId(),
-          request.getForReading(),
           request.getInProgressOk());
     } catch (IOException e) {
       throw new ServiceException(e);

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java Fri Dec 13 17:28:14 2013
@@ -228,14 +228,13 @@ public class QJournalProtocolTranslatorP
 
   @Override
   public GetEditLogManifestResponseProto getEditLogManifest(String jid,
-      long sinceTxId, boolean forReading, boolean inProgressOk)
+      long sinceTxId, boolean inProgressOk)
       throws IOException {
     try {
       return rpcProxy.getEditLogManifest(NULL_CONTROLLER,
           GetEditLogManifestRequestProto.newBuilder()
             .setJid(convertJournalId(jid))
             .setSinceTxId(sinceTxId)
-            .setForReading(forReading)
             .setInProgressOk(inProgressOk)
             .build());
     } catch (ServiceException e) {

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java Fri Dec 13 17:28:14 2013
@@ -630,15 +630,12 @@ class Journal implements Closeable {
    * @see QJournalProtocol#getEditLogManifest(String, long)
    */
   public RemoteEditLogManifest getEditLogManifest(long sinceTxId,
-      boolean forReading, boolean inProgressOk) throws IOException {
+      boolean inProgressOk) throws IOException {
     // No need to checkRequest() here - anyone may ask for the list
     // of segments.
     checkFormatted();
     
-    // if this is for reading, ignore the in-progress editlog segment
-    inProgressOk = forReading ? false : inProgressOk;
-    List<RemoteEditLog> logs = fjm.getRemoteEditLogs(sinceTxId, forReading,
-        inProgressOk);
+    List<RemoteEditLog> logs = fjm.getRemoteEditLogs(sinceTxId, inProgressOk);
     
     if (inProgressOk) {
       RemoteEditLog log = null;

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java Fri Dec 13 17:28:14 2013
@@ -178,11 +178,11 @@ class JournalNodeRpcServer implements QJ
   @SuppressWarnings("deprecation")
   @Override
   public GetEditLogManifestResponseProto getEditLogManifest(String jid,
-      long sinceTxId, boolean forReading, boolean inProgressOk)
+      long sinceTxId, boolean inProgressOk)
       throws IOException {
     
     RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid)
-        .getEditLogManifest(sinceTxId, forReading, inProgressOk);
+        .getEditLogManifest(sinceTxId, inProgressOk);
     
     return GetEditLogManifestResponseProto.newBuilder()
         .setManifest(PBHelper.convert(manifest))

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Fri Dec 13 17:28:14 2013
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import static com.google.common.base.Preconditions.checkArgument;
-
 import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
 
 import java.io.BufferedInputStream;
@@ -221,9 +220,9 @@ public class Balancer {
   private Map<Block, BalancerBlock> globalBlockList
                  = new HashMap<Block, BalancerBlock>();
   private MovedBlocks movedBlocks = new MovedBlocks();
-  // Map storage IDs to BalancerDatanodes
-  private Map<String, BalancerDatanode> datanodes
-                 = new HashMap<String, BalancerDatanode>();
+  /** Map (datanodeUuid -> BalancerDatanodes) */
+  private final Map<String, BalancerDatanode> datanodeMap
+      = new HashMap<String, BalancerDatanode>();
   
   private NetworkTopology cluster;
 
@@ -241,6 +240,14 @@ public class Balancer {
     private PendingBlockMove() {
     }
     
+    @Override
+    public String toString() {
+      final Block b = block.getBlock();
+      return b + " with size=" + b.getNumBytes() + " from "
+          + source.getDisplayName() + " to " + target.getDisplayName()
+          + " through " + proxySource.getDisplayName();
+    }
+
     /* choose a block & a proxy source for this pendingMove 
      * whose source & target have already been chosen.
      * 
@@ -272,11 +279,7 @@ public class Balancer {
             if ( chooseProxySource() ) {
               movedBlocks.add(block);
               if (LOG.isDebugEnabled()) {
-                LOG.debug("Decided to move block "+ block.getBlockId()
-                    +" with a length of "+StringUtils.byteDesc(block.getNumBytes())
-                    + " bytes from " + source.getDisplayName()
-                    + " to " + target.getDisplayName()
-                    + " using proxy source " + proxySource.getDisplayName() );
+                LOG.debug("Decided to move " + this);
               }
               return true;
             }
@@ -292,26 +295,27 @@ public class Balancer {
      */
     private boolean chooseProxySource() {
       final DatanodeInfo targetDN = target.getDatanode();
-      boolean find = false;
-      for (BalancerDatanode loc : block.getLocations()) {
-        // check if there is replica which is on the same rack with the target
-        if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
-          find = true;
-          // if cluster is not nodegroup aware or the proxy is on the same 
-          // nodegroup with target, then we already find the nearest proxy
-          if (!cluster.isNodeGroupAware() 
-              || cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN)) {
+      // if node group is supported, first try add nodes in the same node group
+      if (cluster.isNodeGroupAware()) {
+        for (BalancerDatanode loc : block.getLocations()) {
+          if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN) && addTo(loc)) {
             return true;
           }
         }
-        
-        if (!find) {
-          // find out a non-busy replica out of rack of target
-          find = addTo(loc);
+      }
+      // check if there is replica which is on the same rack with the target
+      for (BalancerDatanode loc : block.getLocations()) {
+        if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
+          return true;
         }
       }
-      
-      return find;
+      // find out a non-busy replica
+      for (BalancerDatanode loc : block.getLocations()) {
+        if (addTo(loc)) {
+          return true;
+        }
+      }
+      return false;
     }
     
     // add a BalancerDatanode as proxy source for specific block movement
@@ -352,17 +356,9 @@ public class Balancer {
         sendRequest(out);
         receiveResponse(in);
         bytesMoved.inc(block.getNumBytes());
-        LOG.info( "Moving block " + block.getBlock().getBlockId() +
-              " from "+ source.getDisplayName() + " to " +
-              target.getDisplayName() + " through " +
-              proxySource.getDisplayName() +
-              " is succeeded." );
+        LOG.info("Successfully moved " + this);
       } catch (IOException e) {
-        LOG.warn("Error moving block "+block.getBlockId()+
-            " from " + source.getDisplayName() + " to " +
-            target.getDisplayName() + " through " +
-            proxySource.getDisplayName() +
-            ": "+e.getMessage());
+        LOG.warn("Failed to move " + this + ": " + e.getMessage());
       } finally {
         IOUtils.closeStream(out);
         IOUtils.closeStream(in);
@@ -414,9 +410,7 @@ public class Balancer {
         @Override
         public void run() {
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Starting moving "+ block.getBlockId() +
-                " from " + proxySource.getDisplayName() + " to " +
-                target.getDisplayName());
+            LOG.debug("Start moving " + PendingBlockMove.this);
           }
           dispatch();
         }
@@ -463,11 +457,6 @@ public class Balancer {
       return block;
     }
     
-    /* Return the block id */
-    private long getBlockId() {
-      return block.getBlockId();
-    }
-    
     /* Return the length of the block */
     private long getNumBytes() {
       return block.getNumBytes();
@@ -551,7 +540,7 @@ public class Balancer {
     
     /* Get the storage id of the datanode */
     protected String getStorageID() {
-      return datanode.getStorageID();
+      return datanode.getDatanodeUuid();
     }
     
     /** Decide if still need to move more bytes */
@@ -674,10 +663,10 @@ public class Balancer {
         
           synchronized (block) {
             // update locations
-            for ( String storageID : blk.getStorageIDs() ) {
-              BalancerDatanode datanode = datanodes.get(storageID);
+            for (String datanodeUuid : blk.getDatanodeUuids()) {
+              final BalancerDatanode d = datanodeMap.get(datanodeUuid);
               if (datanode != null) { // not an unknown datanode
-                block.addLocation(datanode);
+                block.addLocation(d);
               }
             }
           }
@@ -851,16 +840,6 @@ public class Balancer {
                         DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT));
   }
   
-  /* Shuffle datanode array */
-  static private void shuffleArray(DatanodeInfo[] datanodes) {
-    for (int i=datanodes.length; i>1; i--) {
-      int randomIndex = DFSUtil.getRandom().nextInt(i);
-      DatanodeInfo tmp = datanodes[randomIndex];
-      datanodes[randomIndex] = datanodes[i-1];
-      datanodes[i-1] = tmp;
-    }
-  }
-  
   /* Given a data node set, build a network topology and decide
    * over-utilized datanodes, above average utilized datanodes, 
    * below average utilized datanodes, and underutilized datanodes. 
@@ -890,8 +869,7 @@ public class Balancer {
      * an increasing order or a decreasing order.
      */  
     long overLoadedBytes = 0L, underLoadedBytes = 0L;
-    shuffleArray(datanodes);
-    for (DatanodeInfo datanode : datanodes) {
+    for (DatanodeInfo datanode : DFSUtil.shuffle(datanodes)) {
       if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
         continue; // ignore decommissioning or decommissioned nodes
       }
@@ -922,13 +900,13 @@ public class Balancer {
               datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0);
         }
       }
-      this.datanodes.put(datanode.getStorageID(), datanodeS);
+      datanodeMap.put(datanode.getDatanodeUuid(), datanodeS);
     }
 
     //logging
     logNodes();
     
-    assert (this.datanodes.size() == 
+    assert (this.datanodeMap.size() == 
       overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
       aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size())
       : "Mismatched number of datanodes";
@@ -1000,9 +978,9 @@ public class Balancer {
     // At last, match all remaining nodes
     chooseNodes(ANY_OTHER);
     
-    assert (datanodes.size() >= sources.size()+targets.size())
+    assert (datanodeMap.size() >= sources.size()+targets.size())
       : "Mismatched number of datanodes (" +
-      datanodes.size() + " total, " +
+      datanodeMap.size() + " total, " +
       sources.size() + " sources, " +
       targets.size() + " targets)";
 
@@ -1303,7 +1281,7 @@ public class Balancer {
     this.aboveAvgUtilizedDatanodes.clear();
     this.belowAvgUtilizedDatanodes.clear();
     this.underUtilizedDatanodes.clear();
-    this.datanodes.clear();
+    this.datanodeMap.clear();
     this.sources.clear();
     this.targets.clear();  
     this.policy.reset();

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java Fri Dec 13 17:28:14 2013
@@ -75,7 +75,7 @@ public interface BlockCollection {
    * and set the locations.
    */
   public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
-      DatanodeDescriptor[] locations) throws IOException;
+      DatanodeStorageInfo[] targets) throws IOException;
 
   /**
    * @return whether the block collection is under construction.

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Fri Dec 13 17:28:14 2013
@@ -21,6 +21,7 @@ import java.util.LinkedList;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.util.LightWeightGSet;
 
@@ -39,11 +40,11 @@ public class BlockInfo extends Block imp
   private LightWeightGSet.LinkedElement nextLinkedElement;
 
   /**
-   * This array contains triplets of references. For each i-th datanode the
-   * block belongs to triplets[3*i] is the reference to the DatanodeDescriptor
-   * and triplets[3*i+1] and triplets[3*i+2] are references to the previous and
-   * the next blocks, respectively, in the list of blocks belonging to this
-   * data-node.
+   * This array contains triplets of references. For each i-th storage, the
+   * block belongs to triplets[3*i] is the reference to the
+   * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are
+   * references to the previous and the next blocks, respectively, in the list
+   * of blocks belonging to this storage.
    * 
    * Using previous and next in Object triplets is done instead of a
    * {@link LinkedList} list to efficiently use memory. With LinkedList the cost
@@ -86,9 +87,14 @@ public class BlockInfo extends Block imp
   }
 
   public DatanodeDescriptor getDatanode(int index) {
+    DatanodeStorageInfo storage = getStorageInfo(index);
+    return storage == null ? null : storage.getDatanodeDescriptor();
+  }
+
+  DatanodeStorageInfo getStorageInfo(int index) {
     assert this.triplets != null : "BlockInfo is not initialized";
     assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
-    return (DatanodeDescriptor)triplets[index*3];
+    return (DatanodeStorageInfo)triplets[index*3];
   }
 
   private BlockInfo getPrevious(int index) {
@@ -111,14 +117,10 @@ public class BlockInfo extends Block imp
     return info;
   }
 
-  private void setDatanode(int index, DatanodeDescriptor node, BlockInfo previous,
-      BlockInfo next) {
+  private void setStorageInfo(int index, DatanodeStorageInfo storage) {
     assert this.triplets != null : "BlockInfo is not initialized";
-    int i = index * 3;
-    assert index >= 0 && i+2 < triplets.length : "Index is out of bound";
-    triplets[i] = node;
-    triplets[i+1] = previous;
-    triplets[i+2] = next;
+    assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
+    triplets[index*3] = storage;
   }
 
   /**
@@ -190,22 +192,34 @@ public class BlockInfo extends Block imp
   }
 
   /**
-   * Add data-node this block belongs to.
+   * Add a {@link DatanodeStorageInfo} location for a block
    */
-  public boolean addNode(DatanodeDescriptor node) {
-    if(findDatanode(node) >= 0) // the node is already there
-      return false;
+  boolean addStorage(DatanodeStorageInfo storage) {
+    boolean added = true;
+    int idx = findDatanode(storage.getDatanodeDescriptor());
+    if(idx >= 0) {
+      if (getStorageInfo(idx) == storage) { // the storage is already there
+        return false;
+      } else {
+        // The block is on the DN but belongs to a different storage.
+        // Update our state.
+        removeStorage(storage);
+        added = false;      // Just updating storage. Return false.
+      }
+    }
     // find the last null node
     int lastNode = ensureCapacity(1);
-    setDatanode(lastNode, node, null, null);
-    return true;
+    setStorageInfo(lastNode, storage);
+    setNext(lastNode, null);
+    setPrevious(lastNode, null);
+    return added;
   }
 
   /**
-   * Remove data-node from the block.
+   * Remove {@link DatanodeStorageInfo} location for a block
    */
-  public boolean removeNode(DatanodeDescriptor node) {
-    int dnIndex = findDatanode(node);
+  boolean removeStorage(DatanodeStorageInfo storage) {
+    int dnIndex = findStorageInfo(storage);
     if(dnIndex < 0) // the node is not found
       return false;
     assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : 
@@ -213,10 +227,13 @@ public class BlockInfo extends Block imp
     // find the last not null node
     int lastNode = numNodes()-1; 
     // replace current node triplet by the lastNode one 
-    setDatanode(dnIndex, getDatanode(lastNode), getPrevious(lastNode),
-        getNext(lastNode));
+    setStorageInfo(dnIndex, getStorageInfo(lastNode));
+    setNext(dnIndex, getNext(lastNode)); 
+    setPrevious(dnIndex, getPrevious(lastNode)); 
     // set the last triplet to null
-    setDatanode(lastNode, null, null, null);
+    setStorageInfo(lastNode, null);
+    setNext(lastNode, null); 
+    setPrevious(lastNode, null); 
     return true;
   }
 
@@ -236,37 +253,70 @@ public class BlockInfo extends Block imp
     }
     return -1;
   }
+  /**
+   * Find specified DatanodeStorageInfo.
+   * @param dn
+   * @return index or -1 if not found.
+   */
+  int findStorageInfo(DatanodeInfo dn) {
+    int len = getCapacity();
+    for(int idx = 0; idx < len; idx++) {
+      DatanodeStorageInfo cur = getStorageInfo(idx);
+      if(cur == null)
+        break;
+      if(cur.getDatanodeDescriptor() == dn)
+        return idx;
+    }
+    return -1;
+  }
+  
+  /**
+   * Find specified DatanodeStorageInfo.
+   * @param storageInfo
+   * @return index or -1 if not found.
+   */
+  int findStorageInfo(DatanodeStorageInfo storageInfo) {
+    int len = getCapacity();
+    for(int idx = 0; idx < len; idx++) {
+      DatanodeStorageInfo cur = getStorageInfo(idx);
+      if(cur == storageInfo)
+        return idx;
+      if(cur == null)
+        break;
+    }
+    return -1;
+  }
 
   /**
    * Insert this block into the head of the list of blocks 
-   * related to the specified DatanodeDescriptor.
+   * related to the specified DatanodeStorageInfo.
    * If the head is null then form a new list.
    * @return current block as the new head of the list.
    */
-  public BlockInfo listInsert(BlockInfo head, DatanodeDescriptor dn) {
-    int dnIndex = this.findDatanode(dn);
+  BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) {
+    int dnIndex = this.findStorageInfo(storage);
     assert dnIndex >= 0 : "Data node is not found: current";
     assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : 
             "Block is already in the list and cannot be inserted.";
     this.setPrevious(dnIndex, null);
     this.setNext(dnIndex, head);
     if(head != null)
-      head.setPrevious(head.findDatanode(dn), this);
+      head.setPrevious(head.findStorageInfo(storage), this);
     return this;
   }
 
   /**
    * Remove this block from the list of blocks 
-   * related to the specified DatanodeDescriptor.
+   * related to the specified DatanodeStorageInfo.
    * If this block is the head of the list then return the next block as 
    * the new head.
    * @return the new head of the list or null if the list becomes
-   * empty after deletion.
+   * empy after deletion.
    */
-  public BlockInfo listRemove(BlockInfo head, DatanodeDescriptor dn) {
+  BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) {
     if(head == null)
       return null;
-    int dnIndex = this.findDatanode(dn);
+    int dnIndex = this.findStorageInfo(storage);
     if(dnIndex < 0) // this block is not on the data-node list
       return head;
 
@@ -275,9 +325,9 @@ public class BlockInfo extends Block imp
     this.setNext(dnIndex, null);
     this.setPrevious(dnIndex, null);
     if(prev != null)
-      prev.setNext(prev.findDatanode(dn), next);
+      prev.setNext(prev.findStorageInfo(storage), next);
     if(next != null)
-      next.setPrevious(next.findDatanode(dn), prev);
+      next.setPrevious(next.findStorageInfo(storage), prev);
     if(this == head)  // removing the head
       head = next;
     return head;
@@ -289,7 +339,7 @@ public class BlockInfo extends Block imp
    *
    * @return the new head of the list.
    */
-  public BlockInfo moveBlockToHead(BlockInfo head, DatanodeDescriptor dn,
+  public BlockInfo moveBlockToHead(BlockInfo head, DatanodeStorageInfo storage,
       int curIndex, int headIndex) {
     if (head == this) {
       return this;
@@ -298,9 +348,9 @@ public class BlockInfo extends Block imp
     BlockInfo prev = this.setPrevious(curIndex, null);
 
     head.setPrevious(headIndex, this);
-    prev.setNext(prev.findDatanode(dn), next);
+    prev.setNext(prev.findStorageInfo(storage), next);
     if (next != null)
-      next.setPrevious(next.findDatanode(dn), prev);
+      next.setPrevious(next.findStorageInfo(storage), prev);
     return this;
   }
 
@@ -328,10 +378,10 @@ public class BlockInfo extends Block imp
    * @return BlockInfoUnderConstruction -  an under construction block.
    */
   public BlockInfoUnderConstruction convertToBlockUnderConstruction(
-      BlockUCState s, DatanodeDescriptor[] targets) {
+      BlockUCState s, DatanodeStorageInfo[] targets) {
     if(isComplete()) {
-      return new BlockInfoUnderConstruction(
-          this, getBlockCollection().getBlockReplication(), s, targets);
+      return new BlockInfoUnderConstruction(this,
+          getBlockCollection().getBlockReplication(), s, targets);
     }
     // the block is already under construction
     BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)this;

Modified: hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java?rev=1550774&r1=1550773&r2=1550774&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java (original)
+++ hadoop/common/branches/HDFS-4685/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java Fri Dec 13 17:28:14 2013
@@ -63,12 +63,12 @@ public class BlockInfoUnderConstruction 
    * corresponding replicas.
    */
   static class ReplicaUnderConstruction extends Block {
-    private DatanodeDescriptor expectedLocation;
+    private final DatanodeStorageInfo expectedLocation;
     private ReplicaState state;
     private boolean chosenAsPrimary;
 
     ReplicaUnderConstruction(Block block,
-                             DatanodeDescriptor target,
+                             DatanodeStorageInfo target,
                              ReplicaState state) {
       super(block);
       this.expectedLocation = target;
@@ -82,7 +82,7 @@ public class BlockInfoUnderConstruction 
      * It is not guaranteed, but expected, that the data-node actually has
      * the replica.
      */
-    DatanodeDescriptor getExpectedLocation() {
+    private DatanodeStorageInfo getExpectedStorageLocation() {
       return expectedLocation;
     }
 
@@ -118,7 +118,7 @@ public class BlockInfoUnderConstruction 
      * Is data-node the replica belongs to alive.
      */
     boolean isAlive() {
-      return expectedLocation.isAlive;
+      return expectedLocation.getDatanodeDescriptor().isAlive;
     }
 
     @Override // Block
@@ -162,7 +162,7 @@ public class BlockInfoUnderConstruction 
    */
   public BlockInfoUnderConstruction(Block blk, int replication,
                              BlockUCState state,
-                             DatanodeDescriptor[] targets) {
+                             DatanodeStorageInfo[] targets) {
     super(blk, replication);
     assert getBlockUCState() != BlockUCState.COMPLETE :
       "BlockInfoUnderConstruction cannot be in COMPLETE state";
@@ -186,7 +186,7 @@ public class BlockInfoUnderConstruction 
   }
 
   /** Set expected locations */
-  public void setExpectedLocations(DatanodeDescriptor[] targets) {
+  public void setExpectedLocations(DatanodeStorageInfo[] targets) {
     int numLocations = targets == null ? 0 : targets.length;
     this.replicas = new ArrayList<ReplicaUnderConstruction>(numLocations);
     for(int i = 0; i < numLocations; i++)
@@ -198,12 +198,12 @@ public class BlockInfoUnderConstruction 
    * Create array of expected replica locations
    * (as has been assigned by chooseTargets()).
    */
-  public DatanodeDescriptor[] getExpectedLocations() {
+  public DatanodeStorageInfo[] getExpectedStorageLocations() {
     int numLocations = replicas == null ? 0 : replicas.size();
-    DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocations];
+    DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
     for(int i = 0; i < numLocations; i++)
-      locations[i] = replicas.get(i).getExpectedLocation();
-    return locations;
+      storages[i] = replicas.get(i).getExpectedStorageLocation();
+    return storages;
   }
 
   /** Get the number of expected locations */
@@ -244,9 +244,9 @@ public class BlockInfoUnderConstruction 
     // The replica list is unchanged.
     for (ReplicaUnderConstruction r : replicas) {
       if (genStamp != r.getGenerationStamp()) {
-        r.getExpectedLocation().removeBlock(this);
+        r.getExpectedStorageLocation().removeBlock(this);
         NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica "
-            + "from location: " + r.getExpectedLocation());
+            + "from location: " + r.getExpectedStorageLocation());
       }
     }
   }
@@ -302,31 +302,44 @@ public class BlockInfoUnderConstruction 
       if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) {
         continue;
       }
-      if (replicas.get(i).getExpectedLocation().getLastUpdate() > mostRecentLastUpdate) {
-        primary = replicas.get(i);
+      final ReplicaUnderConstruction ruc = replicas.get(i);
+      final long lastUpdate = ruc.getExpectedStorageLocation().getDatanodeDescriptor().getLastUpdate(); 
+      if (lastUpdate > mostRecentLastUpdate) {
         primaryNodeIndex = i;
-        mostRecentLastUpdate = primary.getExpectedLocation().getLastUpdate();
+        primary = ruc;
+        mostRecentLastUpdate = lastUpdate;
       }
     }
     if (primary != null) {
-      primary.getExpectedLocation().addBlockToBeRecovered(this);
+      primary.getExpectedStorageLocation().getDatanodeDescriptor().addBlockToBeRecovered(this);
       primary.setChosenAsPrimary(true);
       NameNode.blockStateChangeLog.info("BLOCK* " + this
         + " recovery started, primary=" + primary);
     }
   }
 
-  void addReplicaIfNotPresent(DatanodeDescriptor dn,
+  void addReplicaIfNotPresent(DatanodeStorageInfo storage,
                      Block block,
                      ReplicaState rState) {
-    for (ReplicaUnderConstruction r : replicas) {
-      if (r.getExpectedLocation() == dn) {
+    Iterator<ReplicaUnderConstruction> it = replicas.iterator();
+    while (it.hasNext()) {
+      ReplicaUnderConstruction r = it.next();
+      if(r.getExpectedStorageLocation() == storage) {
         // Record the gen stamp from the report
         r.setGenerationStamp(block.getGenerationStamp());
         return;
+      } else if (r.getExpectedStorageLocation().getDatanodeDescriptor() ==
+          storage.getDatanodeDescriptor()) {
+
+        // The Datanode reported that the block is on a different storage
+        // than the one chosen by BlockPlacementPolicy. This can occur as
+        // we allow Datanodes to choose the target storage. Update our
+        // state by removing the stale entry and adding a new one.
+        it.remove();
+        break;
       }
     }
-    replicas.add(new ReplicaUnderConstruction(block, dn, rState));
+    replicas.add(new ReplicaUnderConstruction(block, storage, rState));
   }
 
   @Override // BlockInfo



Mime
View raw message