Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D8A767B47 for ; Tue, 26 Jul 2011 20:47:24 +0000 (UTC) Received: (qmail 4635 invoked by uid 500); 26 Jul 2011 20:47:24 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 4606 invoked by uid 500); 26 Jul 2011 20:47:24 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 4598 invoked by uid 99); 26 Jul 2011 20:47:23 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Jul 2011 20:47:23 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Jul 2011 20:47:20 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 220672388894; Tue, 26 Jul 2011 20:47:00 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1151238 - in /hadoop/common/trunk/hdfs: ./ src/java/org/apache/hadoop/hdfs/server/namenode/ Date: Tue, 26 Jul 2011 20:46:59 -0000 To: hdfs-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110726204700.220672388894@eris.apache.org> Author: todd Date: Tue Jul 26 20:46:58 2011 New Revision: 1151238 URL: http://svn.apache.org/viewvc?rev=1151238&view=rev Log: HDFS-2149. Move EditLogOp serialization formats into FsEditLogOp implementations. Contributed by Ivan Kelly. Modified: hadoop/common/trunk/hdfs/CHANGES.txt hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Modified: hadoop/common/trunk/hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1151238&r1=1151237&r2=1151238&view=diff ============================================================================== --- hadoop/common/trunk/hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hdfs/CHANGES.txt Tue Jul 26 20:46:58 2011 @@ -601,6 +601,9 @@ Trunk (unreleased changes) HDFS-2180. Refactor NameNode HTTP server into new class. (todd) + HDFS-2149. Move EditLogOp serialization formats into FsEditLogOp + implementations. (Ivan Kelly via todd) + OPTIMIZATIONS HDFS-1458. Improve checkpoint performance by avoiding unnecessary image Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1151238&r1=1151237&r2=1151238&view=diff ============================================================================== --- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original) +++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Tue Jul 26 20:46:58 2011 @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.DataOutputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -45,25 +46,18 @@ class EditLogBackupOutputStream extends private NamenodeProtocol backupNode; // RPC proxy to backup node private NamenodeRegistration bnRegistration; // backup node registration private NamenodeRegistration nnRegistration; // active node registration - private ArrayList bufCurrent; // current buffer for writing - private ArrayList bufReady; // buffer ready for flushing + private ArrayList bufCurrent; // current buffer for writing + private ArrayList bufReady; // buffer ready for flushing private DataOutputBuffer out; // serialized output sent to backup node - static class JournalRecord { - byte op; - Writable[] args; - - JournalRecord(byte op, Writable ... writables) { - this.op = op; - this.args = writables; - } - - void write(DataOutputStream out) throws IOException { - out.write(op); - if(args == null) - return; - for(Writable w : args) - w.write(out); + + private static class BufferedOp { + public final FSEditLogOpCodes opCode; + public final byte[] bytes; + + public BufferedOp(FSEditLogOpCodes opCode, byte[] bytes) { + this.opCode = opCode; + this.bytes = bytes; } } @@ -84,8 +78,8 @@ class EditLogBackupOutputStream extends Storage.LOG.error("Error connecting to: " + bnAddress, e); throw e; } - this.bufCurrent = new ArrayList(); - this.bufReady = new ArrayList(); + this.bufCurrent = new ArrayList(); + this.bufReady = new ArrayList(); this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE); } @@ -100,13 +94,18 @@ class EditLogBackupOutputStream extends } @Override // EditLogOutputStream - public void write(int b) throws IOException { - throw new IOException("Not implemented"); + void write(FSEditLogOp op) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream s = new DataOutputStream(baos); + FSEditLogOp.Writer w = new FSEditLogOp.Writer(s); + w.writeOp(op); + + bufCurrent.add(new BufferedOp(op.opCode, baos.toByteArray())); } - @Override // EditLogOutputStream - void write(byte op, Writable ... writables) throws IOException { - bufCurrent.add(new JournalRecord(op, writables)); + @Override + void writeRaw(byte[] bytes, int offset, int length) throws IOException { + throw new IOException("Not supported"); } /** @@ -134,7 +133,7 @@ class EditLogBackupOutputStream extends @Override // EditLogOutputStream void setReadyToFlush() throws IOException { assert bufReady.size() == 0 : "previous data is not flushed yet"; - ArrayList tmp = bufReady; + ArrayList tmp = bufReady; bufReady = bufCurrent; bufCurrent = tmp; } @@ -144,12 +143,13 @@ class EditLogBackupOutputStream extends assert out.size() == 0 : "Output buffer is not empty"; int bufReadySize = bufReady.size(); for(int idx = 0; idx < bufReadySize; idx++) { - JournalRecord jRec = null; + BufferedOp jRec = null; for(; idx < bufReadySize; idx++) { jRec = bufReady.get(idx); - if(jRec.op >= FSEditLogOpCodes.OP_JSPOOL_START.getOpCode()) + if(jRec.opCode.getOpCode() + >= FSEditLogOpCodes.OP_JSPOOL_START.getOpCode()) break; // special operation should be sent in a separate call to BN - jRec.write(out); + out.write(jRec.bytes, 0, jRec.bytes.length); } if(out.size() > 0) send(NamenodeProtocol.JA_JOURNAL); @@ -157,8 +157,8 @@ class EditLogBackupOutputStream extends break; // operation like start journal spool or increment checkpoint time // is a separate call to BN - jRec.write(out); - send(jRec.op); + out.write(jRec.bytes, 0, jRec.bytes.length); + send(jRec.opCode.getOpCode()); } bufReady.clear(); // erase all data in the buffer out.reset(); // reset buffer to the start position Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1151238&r1=1151237&r2=1151238&view=diff ============================================================================== --- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original) +++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Tue Jul 26 20:46:58 2011 @@ -45,6 +45,7 @@ class EditLogFileOutputStream extends Ed private FileChannel fc; // channel of the file stream for sync private DataOutputBuffer bufCurrent; // current buffer for writing private DataOutputBuffer bufReady; // buffer ready for flushing + private FSEditLogOp.Writer writer; final private int initBufferSize; // inital buffer size static ByteBuffer fill = ByteBuffer.allocateDirect(1024 * 1024); // preallocation, 1MB @@ -70,6 +71,7 @@ class EditLogFileOutputStream extends Ed initBufferSize = size; bufCurrent = new DataOutputBuffer(size); bufReady = new DataOutputBuffer(size); + writer = new FSEditLogOp.Writer(bufCurrent); RandomAccessFile rp = new RandomAccessFile(name, "rw"); fp = new FileOutputStream(rp.getFD()); // open for append fc = rp.getChannel(); @@ -88,18 +90,11 @@ class EditLogFileOutputStream extends Ed /** {@inheritDoc} */ @Override - public void write(int b) throws IOException { - bufCurrent.write(b); - } - - /** {@inheritDoc} */ - @Override - void write(byte op, Writable... writables) throws IOException { + void write(FSEditLogOp op) throws IOException { int start = bufCurrent.getLength(); - write(op); - for (Writable w : writables) { - w.write(bufCurrent); - } + + writer.writeOp(op); + // write transaction checksum int end = bufCurrent.getLength(); Checksum checksum = FSEditLog.getChecksum(); @@ -109,6 +104,12 @@ class EditLogFileOutputStream extends Ed bufCurrent.writeInt(sum); } + /** {@inheritDoc} */ + @Override + void writeRaw(byte[] bytes, int offset, int length) throws IOException { + bufCurrent.write(bytes, offset, length); + } + /** * Create empty edits logs file. */ @@ -136,6 +137,7 @@ class EditLogFileOutputStream extends Ed } bufCurrent.close(); bufCurrent = null; + writer = null; } if(bufReady != null) { @@ -156,6 +158,7 @@ class EditLogFileOutputStream extends Ed } finally { IOUtils.cleanup(FSNamesystem.LOG, bufCurrent, bufReady, fc, fp); bufCurrent = bufReady = null; + writer = null; fc = null; fp = null; } @@ -168,10 +171,11 @@ class EditLogFileOutputStream extends Ed @Override void setReadyToFlush() throws IOException { assert bufReady.size() == 0 : "previous data is not flushed yet"; - write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker + bufCurrent.write(FSEditLogOpCodes.OP_INVALID.getOpCode()); // insert eof marker DataOutputBuffer tmp = bufReady; bufReady = bufCurrent; bufCurrent = tmp; + writer = new FSEditLogOp.Writer(bufCurrent); } /** Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1151238&r1=1151237&r2=1151238&view=diff ============================================================================== --- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original) +++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Tue Jul 26 20:46:58 2011 @@ -18,17 +18,14 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; -import java.io.OutputStream; import static org.apache.hadoop.hdfs.server.common.Util.now; -import org.apache.hadoop.io.Writable; /** * A generic abstract class to support journaling of edits logs into * a persistent storage. */ -abstract class EditLogOutputStream extends OutputStream -implements JournalStream { +abstract class EditLogOutputStream implements JournalStream { // these are statistics counters private long numSync; // number of sync(s) to disk private long totalTimeSync; // total time to sync @@ -37,19 +34,27 @@ implements JournalStream { numSync = totalTimeSync = 0; } - /** {@inheritDoc} */ - abstract public void write(int b) throws IOException; - /** - * Write edits log record into the stream. - * The record is represented by operation name and - * an array of Writable arguments. + * Write edits log operation to the stream. * * @param op operation - * @param writables array of Writable arguments * @throws IOException */ - abstract void write(byte op, Writable ... writables) throws IOException; + abstract void write(FSEditLogOp op) throws IOException; + + /** + * Write raw data to an edit log. This data should already have + * the transaction ID, checksum, etc included. It is for use + * within the BackupNode when replicating edits from the + * NameNode. + * + * @param bytes the bytes to write. + * @param offset offset in the bytes to write from + * @param length number of bytes to write + * @throws IOException + */ + abstract void writeRaw(byte[] bytes, int offset, int length) + throws IOException; /** * Create and initialize underlying persistent edits log storage. Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1151238&r1=1151237&r2=1151238&view=diff ============================================================================== --- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original) +++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Tue Jul 26 20:46:58 2011 @@ -19,10 +19,12 @@ package org.apache.hadoop.hdfs.server.na import java.io.File; import java.io.IOException; +import java.io.DataOutputStream; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.zip.Checksum; +import java.util.zip.CheckedOutputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,7 +32,6 @@ import org.apache.hadoop.classification. import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.DeprecatedUTF8; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.Storage; @@ -43,14 +44,13 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.PureJavaCrc32; import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*; /** * FSEditLog maintains a log of the namespace modifications. @@ -319,7 +319,7 @@ public class FSEditLog implements NNStor * Write an operation to the edit log. Do not sync to persistent * store yet. */ - void logEdit(FSEditLogOpCodes opCode, Writable ... writables) { + void logEdit(FSEditLogOp op) { synchronized (this) { // wait if an automatic sync is scheduled waitIfAutoSyncScheduled(); @@ -329,10 +329,10 @@ public class FSEditLog implements NNStor ArrayList errorStreams = null; long start = now(); for(EditLogOutputStream eStream : editStreams) { - if(!eStream.isOperationSupported(opCode.getOpCode())) + if(!eStream.isOperationSupported(op.opCode.getOpCode())) continue; try { - eStream.write(opCode.getOpCode(), writables); + eStream.write(op); } catch (IOException ie) { LOG.error("logEdit: removing "+ eStream.getName(), ie); if(errorStreams == null) @@ -585,49 +585,45 @@ public class FSEditLog implements NNStor * Records the block locations of the last block. */ public void logOpenFile(String path, INodeFileUnderConstruction newNode) { - - DeprecatedUTF8 nameReplicationPair[] = new DeprecatedUTF8[] { - new DeprecatedUTF8(path), - FSEditLog.toLogReplication(newNode.getReplication()), - FSEditLog.toLogLong(newNode.getModificationTime()), - FSEditLog.toLogLong(newNode.getAccessTime()), - FSEditLog.toLogLong(newNode.getPreferredBlockSize())}; - logEdit(OP_ADD, - new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair), - new ArrayWritable(Block.class, newNode.getBlocks()), - newNode.getPermissionStatus(), - new DeprecatedUTF8(newNode.getClientName()), - new DeprecatedUTF8(newNode.getClientMachine())); + AddOp op = AddOp.getInstance() + .setPath(path) + .setReplication(newNode.getReplication()) + .setModificationTime(newNode.getModificationTime()) + .setAccessTime(newNode.getAccessTime()) + .setBlockSize(newNode.getPreferredBlockSize()) + .setBlocks(newNode.getBlocks()) + .setPermissionStatus(newNode.getPermissionStatus()) + .setClientName(newNode.getClientName()) + .setClientMachine(newNode.getClientMachine()); + + logEdit(op); } /** * Add close lease record to edit log. */ public void logCloseFile(String path, INodeFile newNode) { - DeprecatedUTF8 nameReplicationPair[] = new DeprecatedUTF8[] { - new DeprecatedUTF8(path), - FSEditLog.toLogReplication(newNode.getReplication()), - FSEditLog.toLogLong(newNode.getModificationTime()), - FSEditLog.toLogLong(newNode.getAccessTime()), - FSEditLog.toLogLong(newNode.getPreferredBlockSize())}; - logEdit(OP_CLOSE, - new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair), - new ArrayWritable(Block.class, newNode.getBlocks()), - newNode.getPermissionStatus()); + CloseOp op = CloseOp.getInstance() + .setPath(path) + .setReplication(newNode.getReplication()) + .setModificationTime(newNode.getModificationTime()) + .setAccessTime(newNode.getAccessTime()) + .setBlockSize(newNode.getPreferredBlockSize()) + .setBlocks(newNode.getBlocks()) + .setPermissionStatus(newNode.getPermissionStatus()); + + logEdit(op); } /** * Add create directory record to edit log */ public void logMkDir(String path, INode newNode) { - DeprecatedUTF8 info[] = new DeprecatedUTF8[] { - new DeprecatedUTF8(path), - FSEditLog.toLogLong(newNode.getModificationTime()), - FSEditLog.toLogLong(newNode.getAccessTime()) - }; - logEdit(OP_MKDIR, - new ArrayWritable(DeprecatedUTF8.class, info), - newNode.getPermissionStatus()); + MkdirOp op = MkdirOp.getInstance() + .setPath(path) + .setTimestamp(newNode.getModificationTime()) + .setPermissionStatus(newNode.getPermissionStatus()); + logEdit(op); } /** @@ -635,33 +631,33 @@ public class FSEditLog implements NNStor * TODO: use String parameters until just before writing to disk */ void logRename(String src, String dst, long timestamp) { - DeprecatedUTF8 info[] = new DeprecatedUTF8[] { - new DeprecatedUTF8(src), - new DeprecatedUTF8(dst), - FSEditLog.toLogLong(timestamp)}; - logEdit(OP_RENAME_OLD, new ArrayWritable(DeprecatedUTF8.class, info)); + RenameOldOp op = RenameOldOp.getInstance() + .setSource(src) + .setDestination(dst) + .setTimestamp(timestamp); + logEdit(op); } /** * Add rename record to edit log */ void logRename(String src, String dst, long timestamp, Options.Rename... options) { - DeprecatedUTF8 info[] = new DeprecatedUTF8[] { - new DeprecatedUTF8(src), - new DeprecatedUTF8(dst), - FSEditLog.toLogLong(timestamp)}; - logEdit(OP_RENAME, - new ArrayWritable(DeprecatedUTF8.class, info), - toBytesWritable(options)); + RenameOp op = RenameOp.getInstance() + .setSource(src) + .setDestination(dst) + .setTimestamp(timestamp) + .setOptions(options); + logEdit(op); } /** * Add set replication record to edit log */ void logSetReplication(String src, short replication) { - logEdit(OP_SET_REPLICATION, - new DeprecatedUTF8(src), - FSEditLog.toLogReplication(replication)); + SetReplicationOp op = SetReplicationOp.getInstance() + .setPath(src) + .setReplication(replication); + logEdit(op); } /** Add set namespace quota record to edit log @@ -670,64 +666,69 @@ public class FSEditLog implements NNStor * @param quota the directory size limit */ void logSetQuota(String src, long nsQuota, long dsQuota) { - logEdit(OP_SET_QUOTA, - new DeprecatedUTF8(src), - new LongWritable(nsQuota), new LongWritable(dsQuota)); + SetQuotaOp op = SetQuotaOp.getInstance() + .setSource(src) + .setNSQuota(nsQuota) + .setDSQuota(dsQuota); + logEdit(op); } /** Add set permissions record to edit log */ void logSetPermissions(String src, FsPermission permissions) { - logEdit(OP_SET_PERMISSIONS, new DeprecatedUTF8(src), permissions); + SetPermissionsOp op = SetPermissionsOp.getInstance() + .setSource(src) + .setPermissions(permissions); + logEdit(op); } /** Add set owner record to edit log */ void logSetOwner(String src, String username, String groupname) { - DeprecatedUTF8 u = new DeprecatedUTF8(username == null? "": username); - DeprecatedUTF8 g = new DeprecatedUTF8(groupname == null? "": groupname); - logEdit(OP_SET_OWNER, new DeprecatedUTF8(src), u, g); + SetOwnerOp op = SetOwnerOp.getInstance() + .setSource(src) + .setUser(username) + .setGroup(groupname); + logEdit(op); } /** * concat(trg,src..) log */ void logConcat(String trg, String [] srcs, long timestamp) { - int size = 1 + srcs.length + 1; // trg, srcs, timestamp - DeprecatedUTF8 info[] = new DeprecatedUTF8[size]; - int idx = 0; - info[idx++] = new DeprecatedUTF8(trg); - for(int i=0; i> opInstances = + new ThreadLocal>() { + @Override + protected EnumMap initialValue() { + EnumMap instances + = new EnumMap(FSEditLogOpCodes.class); + instances.put(OP_ADD, new AddOp()); + instances.put(OP_CLOSE, new CloseOp()); + instances.put(OP_SET_REPLICATION, new SetReplicationOp()); + instances.put(OP_CONCAT_DELETE, new ConcatDeleteOp()); + instances.put(OP_RENAME_OLD, new RenameOldOp()); + instances.put(OP_DELETE, new DeleteOp()); + instances.put(OP_MKDIR, new MkdirOp()); + instances.put(OP_SET_GENSTAMP, new SetGenstampOp()); + instances.put(OP_DATANODE_ADD, new DatanodeAddOp()); + instances.put(OP_DATANODE_REMOVE, new DatanodeRemoveOp()); + instances.put(OP_SET_PERMISSIONS, new SetPermissionsOp()); + instances.put(OP_SET_OWNER, new SetOwnerOp()); + instances.put(OP_SET_NS_QUOTA, new SetNSQuotaOp()); + instances.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp()); + instances.put(OP_SET_QUOTA, new SetQuotaOp()); + instances.put(OP_TIMES, new TimesOp()); + instances.put(OP_SYMLINK, new SymlinkOp()); + instances.put(OP_RENAME, new RenameOp()); + instances.put(OP_REASSIGN_LEASE, new ReassignLeaseOp()); + instances.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp()); + instances.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp()); + instances.put(OP_CANCEL_DELEGATION_TOKEN, + new CancelDelegationTokenOp()); + instances.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp()); + instances.put(OP_CHECKPOINT_TIME, new CheckpointTimeOp()); + instances.put(OP_JSPOOL_START, new JSpoolStartOp()); + return instances; + } + }; + /** * Constructor for an EditLog Op. EditLog ops cannot be constructed * directly, but only through Reader#readOp. @@ -66,10 +106,14 @@ public abstract class FSEditLogOp { this.opCode = opCode; } - public abstract void readFields(DataInputStream in, int logVersion) + abstract void readFields(DataInputStream in, int logVersion) + throws IOException; + + abstract void writeFields(DataOutputStream out) throws IOException; - static class AddCloseOp extends FSEditLogOp { + @SuppressWarnings("unchecked") + static abstract class AddCloseOp extends FSEditLogOp { int length; String path; short replication; @@ -87,7 +131,71 @@ public abstract class FSEditLogOp { assert(opCode == OP_ADD || opCode == OP_CLOSE); } - public void readFields(DataInputStream in, int logVersion) + T setPath(String path) { + this.path = path; + return (T)this; + } + + T setReplication(short replication) { + this.replication = replication; + return (T)this; + } + + T setModificationTime(long mtime) { + this.mtime = mtime; + return (T)this; + } + + T setAccessTime(long atime) { + this.atime = atime; + return (T)this; + } + + T setBlockSize(long blockSize) { + this.blockSize = blockSize; + return (T)this; + } + + T setBlocks(Block[] blocks) { + this.blocks = blocks; + return (T)this; + } + + T setPermissionStatus(PermissionStatus permissions) { + this.permissions = permissions; + return (T)this; + } + + T setClientName(String clientName) { + this.clientName = clientName; + return (T)this; + } + + T setClientMachine(String clientMachine) { + this.clientMachine = clientMachine; + return (T)this; + } + + @Override + void writeFields(DataOutputStream out) throws IOException { + DeprecatedUTF8 nameReplicationPair[] = new DeprecatedUTF8[] { + new DeprecatedUTF8(path), + toLogReplication(replication), + toLogLong(mtime), + toLogLong(atime), + toLogLong(blockSize)}; + new ArrayWritable(DeprecatedUTF8.class, nameReplicationPair).write(out); + new ArrayWritable(Block.class, blocks).write(out); + permissions.write(out); + + if (this.opCode == OP_ADD) { + new DeprecatedUTF8(clientName).write(out); + new DeprecatedUTF8(clientMachine).write(out); + } + } + + @Override + void readFields(DataInputStream in, int logVersion) throws IOException { // versions > 0 support per file replication // get name and replication @@ -168,6 +276,26 @@ public abstract class FSEditLogOp { } } + static class AddOp extends AddCloseOp { + private AddOp() { + super(OP_ADD); + } + + static AddOp getInstance() { + return (AddOp)opInstances.get().get(OP_ADD); + } + } + + static class CloseOp extends AddCloseOp { + private CloseOp() { + super(OP_CLOSE); + } + + static CloseOp getInstance() { + return (CloseOp)opInstances.get().get(OP_CLOSE); + } + } + static class SetReplicationOp extends FSEditLogOp { String path; short replication; @@ -176,7 +304,29 @@ public abstract class FSEditLogOp { super(OP_SET_REPLICATION); } - public void readFields(DataInputStream in, int logVersion) + static SetReplicationOp getInstance() { + return (SetReplicationOp)opInstances.get() + .get(OP_SET_REPLICATION); + } + + SetReplicationOp setPath(String path) { + this.path = path; + return this; + } + + SetReplicationOp setReplication(short replication) { + this.replication = replication; + return this; + } + + @Override + void writeFields(DataOutputStream out) throws IOException { + new DeprecatedUTF8(path).write(out); + new DeprecatedUTF8(Short.toString(replication)).write(out); + } + + @Override + void readFields(DataInputStream in, int logVersion) throws IOException { this.path = FSImageSerialization.readString(in); this.replication = readShort(in); @@ -193,7 +343,41 @@ public abstract class FSEditLogOp { super(OP_CONCAT_DELETE); } - public void readFields(DataInputStream in, int logVersion) + static ConcatDeleteOp getInstance() { + return (ConcatDeleteOp)opInstances.get() + .get(OP_CONCAT_DELETE); + } + + ConcatDeleteOp setTarget(String trg) { + this.trg = trg; + return this; + } + + ConcatDeleteOp setSources(String[] srcs) { + this.srcs = srcs; + return this; + } + + ConcatDeleteOp setTimestamp(long timestamp) { + this.timestamp = timestamp; + return this; + } + + @Override + void writeFields(DataOutputStream out) throws IOException { + int size = 1 + srcs.length + 1; // trg, srcs, timestamp + DeprecatedUTF8 info[] = new DeprecatedUTF8[size]; + int idx = 0; + info[idx++] = new DeprecatedUTF8(trg); + for(int i=0; i opInstances; + /** * Construct the reader * @param in The stream to read from. @@ -650,32 +1380,6 @@ public abstract class FSEditLogOp { this.in = in; this.logVersion = logVersion; this.checksum = checksum; - opInstances = new EnumMap( - FSEditLogOpCodes.class); - opInstances.put(OP_ADD, new AddCloseOp(OP_ADD)); - opInstances.put(OP_CLOSE, new AddCloseOp(OP_CLOSE)); - opInstances.put(OP_SET_REPLICATION, new SetReplicationOp()); - opInstances.put(OP_CONCAT_DELETE, new ConcatDeleteOp()); - opInstances.put(OP_RENAME_OLD, new RenameOldOp()); - opInstances.put(OP_DELETE, new DeleteOp()); - opInstances.put(OP_MKDIR, new MkdirOp()); - opInstances.put(OP_SET_GENSTAMP, new SetGenstampOp()); - opInstances.put(OP_DATANODE_ADD, new DatanodeAddOp()); - opInstances.put(OP_DATANODE_REMOVE, new DatanodeRemoveOp()); - opInstances.put(OP_SET_PERMISSIONS, new SetPermissionsOp()); - opInstances.put(OP_SET_OWNER, new SetOwnerOp()); - opInstances.put(OP_SET_NS_QUOTA, new SetNSQuotaOp()); - opInstances.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp()); - opInstances.put(OP_SET_QUOTA, new SetQuotaOp()); - opInstances.put(OP_TIMES, new TimesOp()); - opInstances.put(OP_SYMLINK, new SymlinkOp()); - opInstances.put(OP_RENAME, new RenameOp()); - opInstances.put(OP_REASSIGN_LEASE, new ReassignLeaseOp()); - opInstances.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp()); - opInstances.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp()); - opInstances.put(OP_CANCEL_DELEGATION_TOKEN, - new CancelDelegationTokenOp()); - opInstances.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp()); } /** @@ -708,7 +1412,7 @@ public abstract class FSEditLogOp { return null; } - FSEditLogOp op = opInstances.get(opCode); + FSEditLogOp op = opInstances.get().get(opCode); if (op == null) { throw new IOException("Read invalid opcode " + opCode); }