hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1365792 - in /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/qjournal/client/ src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/ src/main/java/org/apache/hadoop/hdfs/qjournal/...
Date Wed, 25 Jul 2012 21:44:26 GMT
Author: todd
Date: Wed Jul 25 21:44:26 2012
New Revision: 1365792

URL: http://svn.apache.org/viewvc?rev=1365792&view=rev
Log:
HDFS-3692. Support purgeEditLogs() call to remotely purge logs on JNs. Contributed by Todd
Lipcon.

Modified:
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
(original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
Wed Jul 25 21:44:26 2012
@@ -4,3 +4,5 @@ This will be merged into the main CHANGE
 HDFS-3077. Quorum-based protocol for reading and writing edit logs. Contributed by Todd Lipcon
based on initial work from Brandon Li and Hari Mankude.
 
 HDFS-3694. Fix getEditLogManifest to fetch httpPort if necessary (todd)
+
+HDFS-3692. Support purgeEditLogs() call to remotely purge logs on JNs (todd)

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
(original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
Wed Jul 25 21:44:26 2012
@@ -69,6 +69,12 @@ interface AsyncLogger {
       long startTxId, long endTxId);
 
   /**
+   * Allow the remote node to purge edit logs earlier than this.
+   * @param minTxIdToKeep the min txid which must be retained
+   */
+  public ListenableFuture<Void> purgeLogsOlderThan(long minTxIdToKeep);
+
+  /**
    * @return the state of the last epoch on the target node.
    */
   public ListenableFuture<GetJournalStateResponseProto> getJournalState();

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
(original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
Wed Jul 25 21:44:26 2012
@@ -113,6 +113,12 @@ class AsyncLoggerSet {
       logger.close();
     }
   }
+  
+  void purgeLogsOlderThan(long minTxIdToKeep) {
+    for (AsyncLogger logger : loggers) {
+      logger.purgeLogsOlderThan(minTxIdToKeep);
+    }
+  }
 
 
   /**

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
(original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
Wed Jul 25 21:44:26 2012
@@ -292,6 +292,17 @@ public class IPCLoggerChannel implements
   }
   
   @Override
+  public ListenableFuture<Void> purgeLogsOlderThan(final long minTxIdToKeep) {
+    return executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        getProxy().purgeLogsOlderThan(createReqInfo(), minTxIdToKeep);
+        return null;
+      }
+    });
+  }
+
+  @Override
   public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
       final long fromTxnId) {
     return executor.submit(new Callable<RemoteEditLogManifest>() {

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
(original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
Wed Jul 25 21:44:26 2012
@@ -298,7 +298,10 @@ public class QuorumJournalManager implem
 
   @Override
   public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException {
-    // TODO Auto-generated method stub
+    // This purges asynchronously -- there's no need to wait for a quorum
+    // here, because it's always OK to fail.
+    LOG.info("Purging remote journals older than txid " + minTxIdToKeep);
+    loggers.purgeLogsOlderThan(minTxIdToKeep);
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
(original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
Wed Jul 25 21:44:26 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.qjournal.p
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
 import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
+import org.apache.hadoop.hdfs.server.namenode.JournalManager;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.security.KerberosInfo;
 
@@ -83,7 +84,7 @@ public interface QJournalProtocol {
    * Finalize the given log segment on the JournalNode. The segment
    * is expected to be in-progress and starting at the given startTxId.
    *
-   * @param startTxId the starting transaction ID of teh log
+   * @param startTxId the starting transaction ID of the log
    * @param endTxId the expected last transaction in the given log
    * @throws IOException if no such segment exists
    */
@@ -91,6 +92,13 @@ public interface QJournalProtocol {
       long startTxId, long endTxId) throws IOException;
 
   /**
+   * @throws IOException 
+   * @see JournalManager#purgeLogsOlderThan(long)
+   */
+  public void purgeLogsOlderThan(RequestInfo requestInfo, long minTxIdToKeep)
+      throws IOException;
+  
+  /**
    * @param jid the journal from which to enumerate edits
    * @param sinceTxId the first transaction which the client cares about
    * @return a list of edit log segments since the given transaction ID.
@@ -110,5 +118,4 @@ public interface QJournalProtocol {
    */
   public void acceptRecovery(RequestInfo reqInfo,
       SegmentStateProto stateToAccept, URL fromUrl) throws IOException;
-
 }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
(original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
Wed Jul 25 21:44:26 2012
@@ -36,6 +36,8 @@ import org.apache.hadoop.hdfs.qjournal.p
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentResponseProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
@@ -127,6 +129,18 @@ public class QJournalProtocolServerSideT
     }
     return FinalizeLogSegmentResponseProto.newBuilder().build();
   }
+  
+  @Override
+  public PurgeLogsResponseProto purgeLogs(RpcController controller,
+      PurgeLogsRequestProto req) throws ServiceException {
+    try {
+      impl.purgeLogsOlderThan(convert(req.getReqInfo()),
+          req.getMinTxIdToKeep());
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return PurgeLogsResponseProto.getDefaultInstance();
+  }
 
   @Override
   public GetEditLogManifestResponseProto getEditLogManifest(

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
(original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
Wed Jul 25 21:44:26 2012
@@ -23,7 +23,6 @@ import java.net.URL;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
@@ -39,18 +38,17 @@ import org.apache.hadoop.hdfs.qjournal.p
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
 import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolMetaInterface;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RpcClientUtil;
 
-import com.google.protobuf.ByteString;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
@@ -165,6 +163,20 @@ public class QJournalProtocolTranslatorP
       throw ProtobufHelper.getRemoteException(e);
     }
   }
+  
+  @Override
+  public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep)
+      throws IOException {
+    PurgeLogsRequestProto req = PurgeLogsRequestProto.newBuilder()
+        .setReqInfo(convert(reqInfo))
+        .setMinTxIdToKeep(minTxIdToKeep)
+        .build();
+    try {
+      rpcProxy.purgeLogs(NULL_CONTROLLER, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
 
   @Override
   public GetEditLogManifestResponseProto getEditLogManifest(String jid,
@@ -214,4 +226,5 @@ public class QJournalProtocolTranslatorP
         QJournalProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
         RPC.getProtocolVersion(QJournalProtocolPB.class), methodName);
   }
+
 }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
(original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
Wed Jul 25 21:44:26 2012
@@ -92,7 +92,7 @@ class JNStorage extends Storage {
     return new File(getPaxosDir(), String.valueOf(segmentTxId));
   }
   
-  private File getPaxosDir() {
+  File getPaxosDir() {
     return new File(sd.getCurrentDir(), "paxos");
   }
 

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
(original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
Wed Jul 25 21:44:26 2012
@@ -28,6 +28,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
 import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
+import org.apache.hadoop.hdfs.server.namenode.JournalManager;
 import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
@@ -277,6 +279,41 @@ class Journal implements Closeable {
   }
   
   /**
+   * @see JournalManager#purgeLogsOlderThan(long)
+   */
+  public synchronized void purgeLogsOlderThan(RequestInfo reqInfo,
+      long minTxIdToKeep) throws IOException {
+    checkRequest(reqInfo);
+    
+    fjm.purgeLogsOlderThan(minTxIdToKeep);
+    purgePaxosDecisionsOlderThan(minTxIdToKeep);
+  }
+  
+  private void purgePaxosDecisionsOlderThan(long minTxIdToKeep)
+      throws IOException {
+    File dir = storage.getPaxosDir();
+    for (File f : FileUtil.listFiles(dir)) {
+      if (!f.isFile()) continue;
+      
+      long txid;
+      try {
+        txid = Long.valueOf(f.getName());
+      } catch (NumberFormatException nfe) {
+        LOG.warn("Unexpected non-numeric file name for " + f.getAbsolutePath());
+        continue;
+      }
+      
+      if (txid < minTxIdToKeep) {
+        if (!f.delete()) {
+          LOG.warn("Unable to delete no-longer-needed paxos decision record " +
+              f);
+        }
+      }
+    }
+  }
+
+
+  /**
    * @see QJournalProtocol#getEditLogManifest(String, long)
    */
   public RemoteEditLogManifest getEditLogManifest(long sinceTxId)

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
(original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
Wed Jul 25 21:44:26 2012
@@ -131,6 +131,13 @@ class JournalNodeRpcServer implements QJ
   }
 
   @Override
+  public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep)
+      throws IOException {
+    jn.getOrCreateJournal(reqInfo.getJournalId())
+      .purgeLogsOlderThan(reqInfo, minTxIdToKeep);
+  }
+
+  @Override
   public GetEditLogManifestResponseProto getEditLogManifest(String jid,
       long sinceTxId) throws IOException {
     

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
(original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
Wed Jul 25 21:44:26 2012
@@ -87,6 +87,17 @@ message FinalizeLogSegmentResponseProto 
 }
 
 /**
+ * purgeLogs()
+ */
+message PurgeLogsRequestProto {
+  required RequestInfoProto reqInfo = 1;
+  required uint64 minTxIdToKeep = 2;
+}
+
+message PurgeLogsResponseProto {
+}
+
+/**
  * getJournalState()
  */
 message GetJournalStateRequestProto {
@@ -175,6 +186,9 @@ service QJournalProtocolService {
   rpc finalizeLogSegment(FinalizeLogSegmentRequestProto)
       returns (FinalizeLogSegmentResponseProto);
 
+  rpc purgeLogs(PurgeLogsRequestProto)
+      returns (PurgeLogsResponseProto);
+
   rpc getEditLogManifest(GetEditLogManifestRequestProto)
       returns (GetEditLogManifestResponseProto);
 

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java?rev=1365792&r1=1365791&r2=1365792&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
(original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
Wed Jul 25 21:44:26 2012
@@ -346,6 +346,45 @@ public class TestQuorumJournalManager {
     checkRecovery(cluster, 1, 4);
   }
   
+  @Test
+  public void testPurgeLogs() throws Exception {
+    for (int txid = 1; txid <= 5; txid++) {
+      writeSegment(qjm, txid, 1, true);
+    }
+    File curDir = cluster.getCurrentDir(0, JID);
+    GenericTestUtils.assertGlobEquals(curDir, "edits_.*",
+        NNStorage.getFinalizedEditsFileName(1, 1),
+        NNStorage.getFinalizedEditsFileName(2, 2),
+        NNStorage.getFinalizedEditsFileName(3, 3),
+        NNStorage.getFinalizedEditsFileName(4, 4),
+        NNStorage.getFinalizedEditsFileName(5, 5));
+    File paxosDir = new File(curDir, "paxos");
+    GenericTestUtils.assertExists(paxosDir);
+
+    // Create new files in the paxos directory, which should get purged too.
+    assertTrue(new File(paxosDir, "1").createNewFile());
+    assertTrue(new File(paxosDir, "3").createNewFile());
+    
+    GenericTestUtils.assertGlobEquals(paxosDir, "\\d+",
+        "1", "3");
+    
+    qjm.purgeLogsOlderThan(3);
+    
+    // Log purging is asynchronous, so we have to wait for the calls
+    // to be sent and respond before verifying.
+    waitForAllPendingCalls(qjm.getLoggerSetForTests());
+    
+    // Older edits should be purged
+    GenericTestUtils.assertGlobEquals(curDir, "edits_.*",
+        NNStorage.getFinalizedEditsFileName(3, 3),
+        NNStorage.getFinalizedEditsFileName(4, 4),
+        NNStorage.getFinalizedEditsFileName(5, 5));
+   
+    // Older paxos files should be purged
+    GenericTestUtils.assertGlobEquals(paxosDir, "\\d+",
+        "3");
+  }
+  
   
   private QuorumJournalManager createSpyingQJM()
       throws IOException, URISyntaxException {
@@ -385,6 +424,14 @@ public class TestQuorumJournalManager {
     stm.setReadyToFlush();
     stm.flush();
   }
+  
+  private static void waitForAllPendingCalls(AsyncLoggerSet als)
+      throws InterruptedException {
+    for (AsyncLogger l : als.getLoggersForTests()) {
+      IPCLoggerChannel ch = (IPCLoggerChannel)l;
+      ch.waitForAllPendingCalls();
+    }
+  }
 
   private void assertExistsInQuorum(MiniJournalCluster cluster,
       String fname) {



Mime
View raw message