Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6D8CB200CD9 for ; Thu, 3 Aug 2017 20:41:37 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6BF2716C41E; Thu, 3 Aug 2017 18:41:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8AB7C16C41C for ; Thu, 3 Aug 2017 20:41:36 +0200 (CEST) Received: (qmail 30045 invoked by uid 500); 3 Aug 2017 18:41:35 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zookeeper.apache.org Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 30034 invoked by uid 99); 3 Aug 2017 18:41:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Aug 2017 18:41:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9C6F0E02F3; Thu, 3 Aug 2017 18:41:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hanm@apache.org To: commits@zookeeper.apache.org Message-Id: <6bcd50a2f25a4ee48fea94542a592beb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: zookeeper git commit: ZOOKEEPER-2853: Update lastZxidSeen in FileTxnLog Date: Thu, 3 Aug 2017 18:41:35 +0000 (UTC) archived-at: Thu, 03 Aug 2017 18:41:37 -0000 Repository: zookeeper Updated Branches: refs/heads/master 69c8cbea1 -> 5c4e44332 ZOOKEEPER-2853: Update lastZxidSeen in FileTxnLog Author: Fangmin Lyu Reviewers: Michael Han , maoling Closes #322 from lvfangmin/ZOOKEEPER-2853 Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/5c4e4433 Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/5c4e4433 Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/5c4e4433 Branch: refs/heads/master Commit: 5c4e44332e55bbf21ca59583f3e8ca97fc4bb266 Parents: 69c8cbe Author: Fangmin Lyu Authored: Thu Aug 3 11:41:30 2017 -0700 Committer: Michael Han Committed: Thu Aug 3 11:41:30 2017 -0700 ---------------------------------------------------------------------- .../server/persistence/FileTxnLog.java | 124 ++++++++++--------- 1 file changed, 63 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/5c4e4433/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java ---------------------------------------------------------------------- diff --git a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java index 9f55ab4..72bb583 100644 --- a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java +++ b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java @@ -53,25 +53,25 @@ import org.slf4j.LoggerFactory; *
  * LogFile:
  *     FileHeader TxnList ZeroPad
- * 
+ *
  * FileHeader: {
  *     magic 4bytes (ZKLG)
  *     version 4bytes
  *     dbid 8bytes
  *   }
- * 
+ *
  * TxnList:
  *     Txn || Txn TxnList
- *     
+ *
  * Txn:
  *     checksum Txnlen TxnHeader Record 0x42
- * 
+ *
  * checksum: 8bytes Adler32 is currently used
  *   calculated across payload -- Txnlen, TxnHeader, Record and 0x42
- * 
+ *
  * Txnlen:
  *     len 4bytes
- * 
+ *
  * TxnHeader: {
  *     sessionid 8bytes
  *     cxid 4bytes
@@ -79,13 +79,13 @@ import org.slf4j.LoggerFactory;
  *     time 8bytes
  *     type 4bytes
  *   }
- *     
+ *
  * Record:
  *     See Jute definition file for details on the various record types
- *      
+ *
  * ZeroPad:
  *     0 padded to EOF (filled during preallocation stage)
- * 
+ * */ public class FileTxnLog implements TxnLog { private static final Logger LOG; @@ -175,7 +175,7 @@ public class FileTxnLog implements TxnLog { /** * close all the open file handles * @throws IOException - */ + */ public synchronized void close() throws IOException { if (logStream != null) { logStream.close(); @@ -184,54 +184,56 @@ public class FileTxnLog implements TxnLog { log.close(); } } - + /** * append an entry to the transaction log * @param hdr the header of the transaction * @param txn the transaction part of the entry - * returns true iff something appended, otw false + * returns true iff something appended, otw false */ public synchronized boolean append(TxnHeader hdr, Record txn) throws IOException { - if (hdr != null) { - if (hdr.getZxid() <= lastZxidSeen) { - LOG.warn("Current zxid " + hdr.getZxid() - + " is <= " + lastZxidSeen + " for " - + hdr.getType()); - } - if (logStream==null) { - if(LOG.isInfoEnabled()){ - LOG.info("Creating new log file: log." + - Long.toHexString(hdr.getZxid())); - } - - logFileWrite = new File(logDir, ("log." + - Long.toHexString(hdr.getZxid()))); - fos = new FileOutputStream(logFileWrite); - logStream=new BufferedOutputStream(fos); - oa = BinaryOutputArchive.getArchive(logStream); - FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId); - fhdr.serialize(oa, "fileheader"); - // Make sure that the magic number is written before padding. - logStream.flush(); - currentSize = fos.getChannel().position(); - streamsToFlush.add(fos); - } - padFile(fos); - byte[] buf = Util.marshallTxnEntry(hdr, txn); - if (buf == null || buf.length == 0) { - throw new IOException("Faulty serialization for header " + - "and txn"); - } - Checksum crc = makeChecksumAlgorithm(); - crc.update(buf, 0, buf.length); - oa.writeLong(crc.getValue(), "txnEntryCRC"); - Util.writeTxnBytes(oa, buf); - - return true; + if (hdr == null) { + return false; } - return false; + if (hdr.getZxid() <= lastZxidSeen) { + LOG.warn("Current zxid " + hdr.getZxid() + + " is <= " + lastZxidSeen + " for " + + hdr.getType()); + } else { + lastZxidSeen = hdr.getZxid(); + } + if (logStream==null) { + if(LOG.isInfoEnabled()){ + LOG.info("Creating new log file: log." + + Long.toHexString(hdr.getZxid())); + } + + logFileWrite = new File(logDir, ("log." + + Long.toHexString(hdr.getZxid()))); + fos = new FileOutputStream(logFileWrite); + logStream=new BufferedOutputStream(fos); + oa = BinaryOutputArchive.getArchive(logStream); + FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId); + fhdr.serialize(oa, "fileheader"); + // Make sure that the magic number is written before padding. + logStream.flush(); + currentSize = fos.getChannel().position(); + streamsToFlush.add(fos); + } + padFile(fos); + byte[] buf = Util.marshallTxnEntry(hdr, txn); + if (buf == null || buf.length == 0) { + throw new IOException("Faulty serialization for header " + + "and txn"); + } + Checksum crc = makeChecksumAlgorithm(); + crc.update(buf, 0, buf.length); + oa.writeLong(crc.getValue(), "txnEntryCRC"); + Util.writeTxnBytes(oa, buf); + + return true; } /** @@ -456,10 +458,10 @@ public class FileTxnLog implements TxnLog { } /** - * a class that keeps track of the position + * a class that keeps track of the position * in the input stream. The position points to offset - * that has been consumed by the applications. It can - * wrap buffered input streams to provide the right offset + * that has been consumed by the applications. It can + * wrap buffered input streams to provide the right offset * for the application. */ static class PositionInputStream extends FilterInputStream { @@ -468,7 +470,7 @@ public class FileTxnLog implements TxnLog { super(in); position = 0; } - + @Override public int read() throws IOException { int rc = super.read(); @@ -483,9 +485,9 @@ public class FileTxnLog implements TxnLog { if (rc > 0) { position += rc; } - return rc; + return rc; } - + @Override public int read(byte[] b, int off, int len) throws IOException { int rc = super.read(b, off, len); @@ -494,7 +496,7 @@ public class FileTxnLog implements TxnLog { } return rc; } - + @Override public long skip(long n) throws IOException { long rc = super.skip(n); @@ -522,7 +524,7 @@ public class FileTxnLog implements TxnLog { throw new UnsupportedOperationException("reset"); } } - + /** * this class implements the txnlog iterator interface * which is used for reading the transaction logs @@ -535,7 +537,7 @@ public class FileTxnLog implements TxnLog { File logFile; InputArchive ia; static final String CRC_ERROR="CRC check failed"; - + PositionInputStream inputStream=null; //stored files is the list of files greater than //the zxid we are looking for. @@ -564,7 +566,7 @@ public class FileTxnLog implements TxnLog { } } } - + /** * create an iterator over a transaction database directory * @param logDir the transaction database directory @@ -596,7 +598,7 @@ public class FileTxnLog implements TxnLog { goToNextLog(); next(); } - + /** * Return total storage size of txnlog that will return by this iterator. */ @@ -634,7 +636,7 @@ public class FileTxnLog implements TxnLog { FileHeader header= new FileHeader(); header.deserialize(ia, "fileheader"); if (header.getMagic() != FileTxnLog.TXNLOG_MAGIC) { - throw new IOException("Transaction log: " + this.logFile + " has invalid magic number " + throw new IOException("Transaction log: " + this.logFile + " has invalid magic number " + header.getMagic() + " != " + FileTxnLog.TXNLOG_MAGIC); }