Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A4793200BB8 for ; Sat, 8 Oct 2016 08:10:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A3001160AE9; Sat, 8 Oct 2016 06:10:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 43343160AF3 for ; Sat, 8 Oct 2016 08:10:11 +0200 (CEST) Received: (qmail 67389 invoked by uid 500); 8 Oct 2016 06:10:01 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 61762 invoked by uid 99); 8 Oct 2016 06:09:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 08 Oct 2016 06:09:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 06F8EE946C; Sat, 8 Oct 2016 06:09:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: drankye@apache.org To: common-commits@hadoop.apache.org Date: Sat, 08 Oct 2016 06:10:25 -0000 Message-Id: <9777adebb7024800ab4d678d07b0c7fa@git.apache.org> In-Reply-To: <021c9ac983d14fda98b768c867269e41@git.apache.org> References: <021c9ac983d14fda98b768c867269e41@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [29/50] [abbrv] hadoop git commit: HDFS-10957. Retire BKJM from trunk (Vinayakumar B) archived-at: Sat, 08 Oct 2016 06:10:13 -0000 HDFS-10957. Retire BKJM from trunk (Vinayakumar B) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/31195488 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/31195488 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/31195488 Branch: refs/heads/HADOOP-12756 Commit: 311954883f714973784432589896553eb320b597 Parents: 35b9d7d Author: Vinayakumar B Authored: Thu Oct 6 19:28:25 2016 +0530 Committer: Vinayakumar B Committed: Thu Oct 6 19:28:25 2016 +0530 ---------------------------------------------------------------------- .../src/contrib/bkjournal/README.txt | 66 -- .../dev-support/findbugsExcludeFile.xml | 5 - .../hadoop-hdfs/src/contrib/bkjournal/pom.xml | 175 ---- .../bkjournal/BookKeeperEditLogInputStream.java | 264 ----- .../BookKeeperEditLogOutputStream.java | 188 ---- .../bkjournal/BookKeeperJournalManager.java | 893 ----------------- .../contrib/bkjournal/CurrentInprogress.java | 160 --- .../bkjournal/EditLogLedgerMetadata.java | 217 ---- .../hadoop/contrib/bkjournal/MaxTxId.java | 103 -- .../bkjournal/src/main/proto/bkjournal.proto | 49 - .../hadoop/contrib/bkjournal/BKJMUtil.java | 184 ---- .../bkjournal/TestBookKeeperAsHASharedDir.java | 414 -------- .../bkjournal/TestBookKeeperConfiguration.java | 174 ---- .../bkjournal/TestBookKeeperEditLogStreams.java | 92 -- .../bkjournal/TestBookKeeperHACheckpoints.java | 109 -- .../bkjournal/TestBookKeeperJournalManager.java | 984 ------------------- .../TestBookKeeperSpeculativeRead.java | 167 ---- .../bkjournal/TestBootstrapStandbyWithBKJM.java | 170 ---- .../bkjournal/TestCurrentInprogress.java | 160 --- .../hdfs/server/namenode/FSEditLogTestUtil.java | 40 - .../src/test/resources/log4j.properties | 55 -- .../markdown/HDFSHighAvailabilityWithNFS.md | 114 --- hadoop-hdfs-project/pom.xml | 1 - hadoop-project/pom.xml | 6 - 24 files changed, 4790 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt deleted file mode 100644 index 7f67226..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt +++ /dev/null @@ -1,66 +0,0 @@ -This module provides a BookKeeper backend for HFDS Namenode write -ahead logging. - -BookKeeper is a highly available distributed write ahead logging -system. For more details, see - - http://zookeeper.apache.org/bookkeeper - -------------------------------------------------------------------------------- -How do I build? - - To generate the distribution packages for BK journal, do the - following. - - $ mvn clean package -Pdist - - This will generate a jar with all the dependencies needed by the journal - manager, - - target/hadoop-hdfs-bkjournal-.jar - - Note that the -Pdist part of the build command is important, as otherwise - the dependencies would not be packaged in the jar. - -------------------------------------------------------------------------------- -How do I use the BookKeeper Journal? - - To run a HDFS namenode using BookKeeper as a backend, copy the bkjournal - jar, generated above, into the lib directory of hdfs. In the standard - distribution of HDFS, this is at $HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/ - - cp target/hadoop-hdfs-bkjournal-.jar \ - $HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/ - - Then, in hdfs-site.xml, set the following properties. - - - dfs.namenode.edits.dir - bookkeeper://localhost:2181/bkjournal,file:///path/for/edits - - - - dfs.namenode.edits.journal-plugin.bookkeeper - org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager - - - In this example, the namenode is configured to use 2 write ahead - logging devices. One writes to BookKeeper and the other to a local - file system. At the moment is is not possible to only write to - BookKeeper, as the resource checker explicitly checked for local - disks currently. - - The given example, configures the namenode to look for the journal - metadata at the path /bkjournal on the a standalone zookeeper ensemble - at localhost:2181. To configure a multiple host zookeeper ensemble, - separate the hosts with semicolons. For example, if you have 3 - zookeeper servers, zk1, zk2 & zk3, each listening on port 2181, you - would specify this with - - bookkeeper://zk1:2181;zk2:2181;zk3:2181/bkjournal - - The final part /bkjournal specifies the znode in zookeeper where - ledger metadata will be store. Administrators can set this to anything - they wish. - - http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/dev-support/findbugsExcludeFile.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/dev-support/findbugsExcludeFile.xml deleted file mode 100644 index 45c3a75..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/dev-support/findbugsExcludeFile.xml +++ /dev/null @@ -1,5 +0,0 @@ - - - - - http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml deleted file mode 100644 index 7fb6c24..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml +++ /dev/null @@ -1,175 +0,0 @@ - - - - 4.0.0 - - org.apache.hadoop - hadoop-project - 3.0.0-alpha2-SNAPSHOT - ../../../../../hadoop-project - - - org.apache.hadoop.contrib - hadoop-hdfs-bkjournal - 3.0.0-alpha2-SNAPSHOT - Apache Hadoop HDFS BookKeeper Journal - Apache Hadoop HDFS BookKeeper Journal - jar - - - hdfs - ${basedir}/../../../../../hadoop-common-project/hadoop-common/target - - - - - commons-logging - commons-logging - compile - - - org.apache.hadoop - hadoop-common - provided - - - org.apache.hadoop - hadoop-hdfs - provided - - - org.apache.hadoop - hadoop-hdfs - test-jar - test - - - org.apache.hadoop - hadoop-common - test-jar - test - - - org.apache.bookkeeper - bookkeeper-server - compile - - - org.apache.zookeeper - zookeeper - compile - - - com.google.guava - guava - compile - - - junit - junit - test - - - org.mockito - mockito-all - test - - - - - - org.apache.hadoop - hadoop-maven-plugins - - - compile-protoc - generate-sources - - protoc - - - ${protobuf.version} - ${protoc.path} - - ${basedir}/../../../../../hadoop-common-project/hadoop-common/src/main/proto - ${basedir}/../../../../../hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto - ${basedir}/../../../../../hadoop-hdfs-project/hadoop-hdfs/src/main/proto - ${basedir}/src/main/proto - - - ${basedir}/src/main/proto - - bkjournal.proto - - - ${project.build.directory}/generated-sources/java - - - - - - org.codehaus.mojo - findbugs-maven-plugin - - ${basedir}/dev-support/findbugsExcludeFile.xml - - - - org.apache.rat - apache-rat-plugin - - - dev-support/findbugsExcludeFile.xml - - - - - - - - dist - - - - maven-dependency-plugin - 2.8 - - - dist - package - - copy - - - - - org.apache.bookkeeper - bookkeeper-server - jar - - - ${project.build.directory}/lib - - - - - - - - - http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java deleted file mode 100644 index 86da807..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java +++ /dev/null @@ -1,264 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.contrib.bkjournal; - -import java.io.BufferedInputStream; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.Enumeration; - -import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.client.LedgerEntry; -import org.apache.bookkeeper.client.BKException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * Input stream which reads from a BookKeeper ledger. - */ -class BookKeeperEditLogInputStream extends EditLogInputStream { - static final Log LOG = LogFactory.getLog(BookKeeperEditLogInputStream.class); - - private final long firstTxId; - private final long lastTxId; - private final int logVersion; - private final boolean inProgress; - private final LedgerHandle lh; - - private final FSEditLogOp.Reader reader; - private final FSEditLogLoader.PositionTrackingInputStream tracker; - - /** - * Construct BookKeeper edit log input stream. - * Starts reading from the first entry of the ledger. - */ - BookKeeperEditLogInputStream(final LedgerHandle lh, - final EditLogLedgerMetadata metadata) - throws IOException { - this(lh, metadata, 0); - } - - /** - * Construct BookKeeper edit log input stream. - * Starts reading from firstBookKeeperEntry. This allows the stream - * to take a shortcut during recovery, as it doesn't have to read - * every edit log transaction to find out what the last one is. - */ - BookKeeperEditLogInputStream(LedgerHandle lh, EditLogLedgerMetadata metadata, - long firstBookKeeperEntry) - throws IOException { - this.lh = lh; - this.firstTxId = metadata.getFirstTxId(); - this.lastTxId = metadata.getLastTxId(); - this.logVersion = metadata.getDataLayoutVersion(); - this.inProgress = metadata.isInProgress(); - - if (firstBookKeeperEntry < 0 - || firstBookKeeperEntry > lh.getLastAddConfirmed()) { - throw new IOException("Invalid first bk entry to read: " - + firstBookKeeperEntry + ", LAC: " + lh.getLastAddConfirmed()); - } - BufferedInputStream bin = new BufferedInputStream( - new LedgerInputStream(lh, firstBookKeeperEntry)); - tracker = new FSEditLogLoader.PositionTrackingInputStream(bin); - DataInputStream in = new DataInputStream(tracker); - - reader = FSEditLogOp.Reader.create(in, tracker, logVersion); - } - - @Override - public long getFirstTxId() { - return firstTxId; - } - - @Override - public long getLastTxId() { - return lastTxId; - } - - @Override - public int getVersion(boolean verifyVersion) throws IOException { - return logVersion; - } - - @Override - protected FSEditLogOp nextOp() throws IOException { - return reader.readOp(false); - } - - @Override - public void close() throws IOException { - try { - lh.close(); - } catch (BKException e) { - throw new IOException("Exception closing ledger", e); - } catch (InterruptedException e) { - throw new IOException("Interrupted closing ledger", e); - } - } - - @Override - public long getPosition() { - return tracker.getPos(); - } - - @Override - public long length() throws IOException { - return lh.getLength(); - } - - @Override - public String getName() { - return String.format( - "BookKeeperLedger[ledgerId=%d,firstTxId=%d,lastTxId=%d]", lh.getId(), - firstTxId, lastTxId); - } - - @Override - public boolean isInProgress() { - return inProgress; - } - - /** - * Skip forward to specified transaction id. - * Currently we do this by just iterating forward. - * If this proves to be too expensive, this can be reimplemented - * with a binary search over bk entries - */ - public void skipTo(long txId) throws IOException { - long numToSkip = getFirstTxId() - txId; - - FSEditLogOp op = null; - for (long i = 0; i < numToSkip; i++) { - op = readOp(); - } - if (op != null && op.getTransactionId() != txId-1) { - throw new IOException("Corrupt stream, expected txid " - + (txId-1) + ", got " + op.getTransactionId()); - } - } - - @Override - public String toString() { - return ("BookKeeperEditLogInputStream {" + this.getName() + "}"); - } - - @Override - public void setMaxOpSize(int maxOpSize) { - reader.setMaxOpSize(maxOpSize); - } - - @Override - public boolean isLocalLog() { - return false; - } - - /** - * Input stream implementation which can be used by - * FSEditLogOp.Reader - */ - private static class LedgerInputStream extends InputStream { - private long readEntries; - private InputStream entryStream = null; - private final LedgerHandle lh; - private final long maxEntry; - - /** - * Construct ledger input stream - * @param lh the ledger handle to read from - * @param firstBookKeeperEntry ledger entry to start reading from - */ - LedgerInputStream(LedgerHandle lh, long firstBookKeeperEntry) - throws IOException { - this.lh = lh; - readEntries = firstBookKeeperEntry; - - maxEntry = lh.getLastAddConfirmed(); - } - - /** - * Get input stream representing next entry in the - * ledger. - * @return input stream, or null if no more entries - */ - private InputStream nextStream() throws IOException { - try { - if (readEntries > maxEntry) { - return null; - } - Enumeration entries - = lh.readEntries(readEntries, readEntries); - readEntries++; - if (entries.hasMoreElements()) { - LedgerEntry e = entries.nextElement(); - assert !entries.hasMoreElements(); - return e.getEntryInputStream(); - } - } catch (BKException e) { - throw new IOException("Error reading entries from bookkeeper", e); - } catch (InterruptedException e) { - throw new IOException("Interrupted reading entries from bookkeeper", e); - } - return null; - } - - @Override - public int read() throws IOException { - byte[] b = new byte[1]; - if (read(b, 0, 1) != 1) { - return -1; - } else { - return b[0]; - } - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - try { - int read = 0; - if (entryStream == null) { - entryStream = nextStream(); - if (entryStream == null) { - return read; - } - } - - while (read < len) { - int thisread = entryStream.read(b, off+read, (len-read)); - if (thisread == -1) { - entryStream = nextStream(); - if (entryStream == null) { - return read; - } - } else { - read += thisread; - } - } - return read; - } catch (IOException e) { - throw e; - } - - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java deleted file mode 100644 index 865806b..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java +++ /dev/null @@ -1,188 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.contrib.bkjournal; - -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.CountDownLatch; - -import java.util.Arrays; - -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.AsyncCallback.AddCallback; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer; - -import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; -import org.apache.hadoop.io.DataOutputBuffer; -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * Output stream for BookKeeper Journal. - * Multiple complete edit log entries are packed into a single bookkeeper - * entry before sending it over the network. The fact that the edit log entries - * are complete in the bookkeeper entries means that each bookkeeper log entry - *can be read as a complete edit log. This is useful for recover, as we don't - * need to read through the entire edit log segment to get the last written - * entry. - */ -class BookKeeperEditLogOutputStream - extends EditLogOutputStream implements AddCallback { - static final Log LOG = LogFactory.getLog(BookKeeperEditLogOutputStream.class); - - private final DataOutputBuffer bufCurrent; - private final AtomicInteger outstandingRequests; - private final int transmissionThreshold; - private final LedgerHandle lh; - private CountDownLatch syncLatch; - private final AtomicInteger transmitResult - = new AtomicInteger(BKException.Code.OK); - private final Writer writer; - - /** - * Construct an edit log output stream which writes to a ledger. - - */ - protected BookKeeperEditLogOutputStream(Configuration conf, LedgerHandle lh) - throws IOException { - super(); - - bufCurrent = new DataOutputBuffer(); - outstandingRequests = new AtomicInteger(0); - syncLatch = null; - this.lh = lh; - this.writer = new Writer(bufCurrent); - this.transmissionThreshold - = conf.getInt(BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE, - BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE_DEFAULT); - } - - @Override - public void create(int layoutVersion) throws IOException { - // noop - } - - @Override - public void close() throws IOException { - setReadyToFlush(); - flushAndSync(true); - try { - lh.close(); - } catch (InterruptedException ie) { - throw new IOException("Interrupted waiting on close", ie); - } catch (BKException bke) { - throw new IOException("BookKeeper error during close", bke); - } - } - - @Override - public void abort() throws IOException { - try { - lh.close(); - } catch (InterruptedException ie) { - throw new IOException("Interrupted waiting on close", ie); - } catch (BKException bke) { - throw new IOException("BookKeeper error during abort", bke); - } - - } - - @Override - public void writeRaw(final byte[] data, int off, int len) throws IOException { - throw new IOException("Not supported for BK"); - } - - @Override - public void write(FSEditLogOp op) throws IOException { - writer.writeOp(op); - - if (bufCurrent.getLength() > transmissionThreshold) { - transmit(); - } - } - - @Override - public void setReadyToFlush() throws IOException { - transmit(); - - synchronized (this) { - syncLatch = new CountDownLatch(outstandingRequests.get()); - } - } - - @Override - public void flushAndSync(boolean durable) throws IOException { - assert(syncLatch != null); - try { - syncLatch.await(); - } catch (InterruptedException ie) { - throw new IOException("Interrupted waiting on latch", ie); - } - if (transmitResult.get() != BKException.Code.OK) { - throw new IOException("Failed to write to bookkeeper; Error is (" - + transmitResult.get() + ") " - + BKException.getMessage(transmitResult.get())); - } - - syncLatch = null; - // wait for whatever we wait on - } - - /** - * Transmit the current buffer to bookkeeper. - * Synchronised at the FSEditLog level. #write() and #setReadyToFlush() - * are never called at the same time. - */ - private void transmit() throws IOException { - if (!transmitResult.compareAndSet(BKException.Code.OK, - BKException.Code.OK)) { - throw new IOException("Trying to write to an errored stream;" - + " Error code : (" + transmitResult.get() - + ") " + BKException.getMessage(transmitResult.get())); - } - if (bufCurrent.getLength() > 0) { - byte[] entry = Arrays.copyOf(bufCurrent.getData(), - bufCurrent.getLength()); - lh.asyncAddEntry(entry, this, null); - bufCurrent.reset(); - outstandingRequests.incrementAndGet(); - } - } - - @Override - public void addComplete(int rc, LedgerHandle handle, - long entryId, Object ctx) { - synchronized(this) { - outstandingRequests.decrementAndGet(); - if (!transmitResult.compareAndSet(BKException.Code.OK, rc)) { - LOG.warn("Tried to set transmit result to (" + rc + ") \"" - + BKException.getMessage(rc) + "\"" - + " but is already (" + transmitResult.get() + ") \"" - + BKException.getMessage(transmitResult.get()) + "\""); - } - CountDownLatch l = syncLatch; - if (l != null) { - l.countDown(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java deleted file mode 100644 index 8e4d032..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java +++ /dev/null @@ -1,893 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.contrib.bkjournal; - -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.hdfs.server.common.Storage; -import org.apache.hadoop.hdfs.server.common.StorageInfo; -import org.apache.hadoop.hdfs.server.namenode.JournalManager; -import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; -import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; -import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; -import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; -import org.apache.hadoop.conf.Configuration; - -import org.apache.bookkeeper.conf.ClientConfiguration; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.util.ZkUtils; - -import org.apache.zookeeper.data.Stat; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.AsyncCallback.StringCallback; -import org.apache.zookeeper.ZKUtil; - -import java.util.Collection; -import java.util.Collections; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.io.IOException; - -import java.net.URI; - -import org.apache.hadoop.hdfs.protocolPB.PBHelper; -import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.VersionProto; -import com.google.protobuf.TextFormat; -import static com.google.common.base.Charsets.UTF_8; - -import org.apache.commons.io.Charsets; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import com.google.common.annotations.VisibleForTesting; -/** - * BookKeeper Journal Manager - * - * To use, add the following to hdfs-site.xml. - *
- * {@code
- * 
- *   dfs.namenode.edits.dir
- *   bookkeeper://zk1:2181;zk2:2181;zk3:2181/hdfsjournal
- * 
- *
- * 
- *   dfs.namenode.edits.journal-plugin.bookkeeper
- *   org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager
- * 
- * }
- * 
- * The URI format for bookkeeper is bookkeeper://[zkEnsemble]/[rootZnode] - * [zookkeeper ensemble] is a list of semi-colon separated, zookeeper host:port - * pairs. In the example above there are 3 servers, in the ensemble, - * zk1, zk2 & zk3, each one listening on port 2181. - * - * [root znode] is the path of the zookeeper znode, under which the editlog - * information will be stored. - * - * Other configuration options are: - *
    - *
  • dfs.namenode.bookkeeperjournal.output-buffer-size - * Number of bytes a bookkeeper journal stream will buffer before - * forcing a flush. Default is 1024.
  • - *
  • dfs.namenode.bookkeeperjournal.ensemble-size - * Number of bookkeeper servers in edit log ledger ensembles. This - * is the number of bookkeeper servers which need to be available - * for the ledger to be writable. Default is 3.
  • - *
  • dfs.namenode.bookkeeperjournal.quorum-size - * Number of bookkeeper servers in the write quorum. This is the - * number of bookkeeper servers which must have acknowledged the - * write of an entry before it is considered written. - * Default is 2.
  • - *
  • dfs.namenode.bookkeeperjournal.digestPw - * Password to use when creating ledgers.
  • - *
  • dfs.namenode.bookkeeperjournal.zk.session.timeout - * Session timeout for Zookeeper client from BookKeeper Journal Manager. - * Hadoop recommends that, this value should be less than the ZKFC - * session timeout value. Default value is 3000.
  • - *
- */ -public class BookKeeperJournalManager implements JournalManager { - static final Log LOG = LogFactory.getLog(BookKeeperJournalManager.class); - - public static final String BKJM_OUTPUT_BUFFER_SIZE - = "dfs.namenode.bookkeeperjournal.output-buffer-size"; - public static final int BKJM_OUTPUT_BUFFER_SIZE_DEFAULT = 1024; - - public static final String BKJM_BOOKKEEPER_ENSEMBLE_SIZE - = "dfs.namenode.bookkeeperjournal.ensemble-size"; - public static final int BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT = 3; - - public static final String BKJM_BOOKKEEPER_QUORUM_SIZE - = "dfs.namenode.bookkeeperjournal.quorum-size"; - public static final int BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT = 2; - - public static final String BKJM_BOOKKEEPER_DIGEST_PW - = "dfs.namenode.bookkeeperjournal.digestPw"; - public static final String BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT = ""; - - private static final int BKJM_LAYOUT_VERSION = -1; - - public static final String BKJM_ZK_SESSION_TIMEOUT - = "dfs.namenode.bookkeeperjournal.zk.session.timeout"; - public static final int BKJM_ZK_SESSION_TIMEOUT_DEFAULT = 3000; - - private static final String BKJM_EDIT_INPROGRESS = "inprogress_"; - - public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH - = "dfs.namenode.bookkeeperjournal.zk.availablebookies"; - - public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT - = "/ledgers/available"; - - public static final String BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS - = "dfs.namenode.bookkeeperjournal.speculativeReadTimeoutMs"; - public static final int BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT - = 2000; - - public static final String BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC - = "dfs.namenode.bookkeeperjournal.readEntryTimeoutSec"; - public static final int BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT = 5; - - public static final String BKJM_BOOKKEEPER_ACK_QUORUM_SIZE - = "dfs.namenode.bookkeeperjournal.ack.quorum-size"; - - public static final String BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC - = "dfs.namenode.bookkeeperjournal.addEntryTimeoutSec"; - public static final int BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_DEFAULT = 5; - - private ZooKeeper zkc; - private final Configuration conf; - private final BookKeeper bkc; - private final CurrentInprogress ci; - private final String basePath; - private final String ledgerPath; - private final String versionPath; - private final MaxTxId maxTxId; - private final int ensembleSize; - private final int quorumSize; - private final int ackQuorumSize; - private final int addEntryTimeout; - private final String digestpw; - private final int speculativeReadTimeout; - private final int readEntryTimeout; - private final CountDownLatch zkConnectLatch; - private final NamespaceInfo nsInfo; - private boolean initialized = false; - private LedgerHandle currentLedger = null; - - /** - * Construct a Bookkeeper journal manager. - */ - public BookKeeperJournalManager(Configuration conf, URI uri, - NamespaceInfo nsInfo) throws IOException { - this.conf = conf; - this.nsInfo = nsInfo; - - String zkConnect = uri.getAuthority().replace(";", ","); - basePath = uri.getPath(); - ensembleSize = conf.getInt(BKJM_BOOKKEEPER_ENSEMBLE_SIZE, - BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT); - quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE, - BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT); - ackQuorumSize = conf.getInt(BKJM_BOOKKEEPER_ACK_QUORUM_SIZE, quorumSize); - addEntryTimeout = conf.getInt(BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC, - BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_DEFAULT); - speculativeReadTimeout = conf.getInt( - BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS, - BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT); - readEntryTimeout = conf.getInt(BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC, - BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT); - - ledgerPath = basePath + "/ledgers"; - String maxTxIdPath = basePath + "/maxtxid"; - String currentInprogressNodePath = basePath + "/CurrentInprogress"; - versionPath = basePath + "/version"; - digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW, - BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT); - - try { - zkConnectLatch = new CountDownLatch(1); - int bkjmZKSessionTimeout = conf.getInt(BKJM_ZK_SESSION_TIMEOUT, - BKJM_ZK_SESSION_TIMEOUT_DEFAULT); - zkc = new ZooKeeper(zkConnect, bkjmZKSessionTimeout, - new ZkConnectionWatcher()); - // Configured zk session timeout + some extra grace period (here - // BKJM_ZK_SESSION_TIMEOUT_DEFAULT used as grace period) - int zkConnectionLatchTimeout = bkjmZKSessionTimeout - + BKJM_ZK_SESSION_TIMEOUT_DEFAULT; - if (!zkConnectLatch - .await(zkConnectionLatchTimeout, TimeUnit.MILLISECONDS)) { - throw new IOException("Error connecting to zookeeper"); - } - - prepareBookKeeperEnv(); - ClientConfiguration clientConf = new ClientConfiguration(); - clientConf.setSpeculativeReadTimeout(speculativeReadTimeout); - clientConf.setReadEntryTimeout(readEntryTimeout); - clientConf.setAddEntryTimeout(addEntryTimeout); - bkc = new BookKeeper(clientConf, zkc); - } catch (KeeperException e) { - throw new IOException("Error initializing zk", e); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted while initializing bk journal manager", - ie); - } - - ci = new CurrentInprogress(zkc, currentInprogressNodePath); - maxTxId = new MaxTxId(zkc, maxTxIdPath); - } - - /** - * Pre-creating bookkeeper metadata path in zookeeper. - */ - private void prepareBookKeeperEnv() throws IOException { - // create bookie available path in zookeeper if it doesn't exists - final String zkAvailablePath = conf.get(BKJM_ZK_LEDGERS_AVAILABLE_PATH, - BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT); - final CountDownLatch zkPathLatch = new CountDownLatch(1); - - final AtomicBoolean success = new AtomicBoolean(false); - StringCallback callback = new StringCallback() { - @Override - public void processResult(int rc, String path, Object ctx, String name) { - if (KeeperException.Code.OK.intValue() == rc - || KeeperException.Code.NODEEXISTS.intValue() == rc) { - LOG.info("Successfully created bookie available path : " - + zkAvailablePath); - success.set(true); - } else { - KeeperException.Code code = KeeperException.Code.get(rc); - LOG.error("Error : " - + KeeperException.create(code, path).getMessage() - + ", failed to create bookie available path : " - + zkAvailablePath); - } - zkPathLatch.countDown(); - } - }; - ZkUtils.asyncCreateFullPathOptimistic(zkc, zkAvailablePath, new byte[0], - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, callback, null); - - try { - if (!zkPathLatch.await(zkc.getSessionTimeout(), TimeUnit.MILLISECONDS) - || !success.get()) { - throw new IOException("Couldn't create bookie available path :" - + zkAvailablePath + ", timed out " + zkc.getSessionTimeout() - + " millis"); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException( - "Interrupted when creating the bookie available path : " - + zkAvailablePath, e); - } - } - - @Override - public void format(NamespaceInfo ns) throws IOException { - try { - // delete old info - Stat baseStat = null; - Stat ledgerStat = null; - if ((baseStat = zkc.exists(basePath, false)) != null) { - if ((ledgerStat = zkc.exists(ledgerPath, false)) != null) { - for (EditLogLedgerMetadata l : getLedgerList(true)) { - try { - bkc.deleteLedger(l.getLedgerId()); - } catch (BKException.BKNoSuchLedgerExistsException bke) { - LOG.warn("Ledger " + l.getLedgerId() + " does not exist;" - + " Cannot delete."); - } - } - } - ZKUtil.deleteRecursive(zkc, basePath); - } - - // should be clean now. - zkc.create(basePath, new byte[] {'0'}, - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - VersionProto.Builder builder = VersionProto.newBuilder(); - builder.setNamespaceInfo(PBHelper.convert(ns)) - .setLayoutVersion(BKJM_LAYOUT_VERSION); - - byte[] data = TextFormat.printToString(builder.build()).getBytes(UTF_8); - zkc.create(versionPath, data, - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - zkc.create(ledgerPath, new byte[] {'0'}, - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } catch (KeeperException ke) { - LOG.error("Error accessing zookeeper to format", ke); - throw new IOException("Error accessing zookeeper to format", ke); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted during format", ie); - } catch (BKException bke) { - throw new IOException("Error cleaning up ledgers during format", bke); - } - } - - @Override - public boolean hasSomeData() throws IOException { - try { - return zkc.exists(basePath, false) != null; - } catch (KeeperException ke) { - throw new IOException("Couldn't contact zookeeper", ke); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted while checking for data", ie); - } - } - - synchronized private void checkEnv() throws IOException { - if (!initialized) { - try { - Stat versionStat = zkc.exists(versionPath, false); - if (versionStat == null) { - throw new IOException("Environment not initialized. " - +"Have you forgotten to format?"); - } - byte[] d = zkc.getData(versionPath, false, versionStat); - - VersionProto.Builder builder = VersionProto.newBuilder(); - TextFormat.merge(new String(d, UTF_8), builder); - if (!builder.isInitialized()) { - throw new IOException("Invalid/Incomplete data in znode"); - } - VersionProto vp = builder.build(); - - // There's only one version at the moment - assert vp.getLayoutVersion() == BKJM_LAYOUT_VERSION; - - NamespaceInfo readns = PBHelper.convert(vp.getNamespaceInfo()); - - if (nsInfo.getNamespaceID() != readns.getNamespaceID() || - !nsInfo.clusterID.equals(readns.getClusterID()) || - !nsInfo.getBlockPoolID().equals(readns.getBlockPoolID())) { - String err = String.format("Environment mismatch. Running process %s" - +", stored in ZK %s", nsInfo, readns); - LOG.error(err); - throw new IOException(err); - } - - ci.init(); - initialized = true; - } catch (KeeperException ke) { - throw new IOException("Cannot access ZooKeeper", ke); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted while checking environment", ie); - } - } - } - - /** - * Start a new log segment in a BookKeeper ledger. - * First ensure that we have the write lock for this journal. - * Then create a ledger and stream based on that ledger. - * The ledger id is written to the inprogress znode, so that in the - * case of a crash, a recovery process can find the ledger we were writing - * to when we crashed. - * @param txId First transaction id to be written to the stream - */ - @Override - public EditLogOutputStream startLogSegment(long txId, int layoutVersion) - throws IOException { - checkEnv(); - - if (txId <= maxTxId.get()) { - throw new IOException("We've already seen " + txId - + ". A new stream cannot be created with it"); - } - - try { - String existingInprogressNode = ci.read(); - if (null != existingInprogressNode - && zkc.exists(existingInprogressNode, false) != null) { - throw new IOException("Inprogress node already exists"); - } - if (currentLedger != null) { - // bookkeeper errored on last stream, clean up ledger - currentLedger.close(); - } - currentLedger = bkc.createLedger(ensembleSize, quorumSize, ackQuorumSize, - BookKeeper.DigestType.MAC, - digestpw.getBytes(Charsets.UTF_8)); - } catch (BKException bke) { - throw new IOException("Error creating ledger", bke); - } catch (KeeperException ke) { - throw new IOException("Error in zookeeper while creating ledger", ke); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted creating ledger", ie); - } - - try { - String znodePath = inprogressZNode(txId); - EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath, - layoutVersion, currentLedger.getId(), txId); - /* Write the ledger metadata out to the inprogress ledger znode - * This can fail if for some reason our write lock has - * expired (@see WriteLock) and another process has managed to - * create the inprogress znode. - * In this case, throw an exception. We don't want to continue - * as this would lead to a split brain situation. - */ - l.write(zkc, znodePath); - - maxTxId.store(txId); - ci.update(znodePath); - return new BookKeeperEditLogOutputStream(conf, currentLedger); - } catch (KeeperException ke) { - cleanupLedger(currentLedger); - throw new IOException("Error storing ledger metadata", ke); - } - } - - private void cleanupLedger(LedgerHandle lh) { - try { - long id = currentLedger.getId(); - currentLedger.close(); - bkc.deleteLedger(id); - } catch (BKException bke) { - //log & ignore, an IOException will be thrown soon - LOG.error("Error closing ledger", bke); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - LOG.warn("Interrupted while closing ledger", ie); - } - } - - - - /** - * Finalize a log segment. If the journal manager is currently - * writing to a ledger, ensure that this is the ledger of the log segment - * being finalized. - * - * Otherwise this is the recovery case. In the recovery case, ensure that - * the firstTxId of the ledger matches firstTxId for the segment we are - * trying to finalize. - */ - @Override - public void finalizeLogSegment(long firstTxId, long lastTxId) - throws IOException { - checkEnv(); - - String inprogressPath = inprogressZNode(firstTxId); - try { - Stat inprogressStat = zkc.exists(inprogressPath, false); - if (inprogressStat == null) { - throw new IOException("Inprogress znode " + inprogressPath - + " doesn't exist"); - } - - EditLogLedgerMetadata l - = EditLogLedgerMetadata.read(zkc, inprogressPath); - - if (currentLedger != null) { // normal, non-recovery case - if (l.getLedgerId() == currentLedger.getId()) { - try { - currentLedger.close(); - } catch (BKException bke) { - LOG.error("Error closing current ledger", bke); - } - currentLedger = null; - } else { - throw new IOException( - "Active ledger has different ID to inprogress. " - + l.getLedgerId() + " found, " - + currentLedger.getId() + " expected"); - } - } - - if (l.getFirstTxId() != firstTxId) { - throw new IOException("Transaction id not as expected, " - + l.getFirstTxId() + " found, " + firstTxId + " expected"); - } - - l.finalizeLedger(lastTxId); - String finalisedPath = finalizedLedgerZNode(firstTxId, lastTxId); - try { - l.write(zkc, finalisedPath); - } catch (KeeperException.NodeExistsException nee) { - if (!l.verify(zkc, finalisedPath)) { - throw new IOException("Node " + finalisedPath + " already exists" - + " but data doesn't match"); - } - } - maxTxId.store(lastTxId); - zkc.delete(inprogressPath, inprogressStat.getVersion()); - String inprogressPathFromCI = ci.read(); - if (inprogressPath.equals(inprogressPathFromCI)) { - ci.clear(); - } - } catch (KeeperException e) { - throw new IOException("Error finalising ledger", e); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new IOException("Error finalising ledger", ie); - } - } - - public void selectInputStreams( - Collection streams, - long fromTxnId, boolean inProgressOk) throws IOException { - selectInputStreams(streams, fromTxnId, inProgressOk, false); - } - - @Override - public void selectInputStreams(Collection streams, - long fromTxId, boolean inProgressOk, boolean onlyDurableTxns) - throws IOException { - List currentLedgerList = getLedgerList(fromTxId, - inProgressOk); - try { - BookKeeperEditLogInputStream elis = null; - for (EditLogLedgerMetadata l : currentLedgerList) { - long lastTxId = l.getLastTxId(); - if (l.isInProgress()) { - lastTxId = recoverLastTxId(l, false); - } - // Check once again, required in case of InProgress and is case of any - // gap. - if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) { - LedgerHandle h; - if (l.isInProgress()) { // we don't want to fence the current journal - h = bkc.openLedgerNoRecovery(l.getLedgerId(), - BookKeeper.DigestType.MAC, digestpw.getBytes(Charsets.UTF_8)); - } else { - h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC, - digestpw.getBytes(Charsets.UTF_8)); - } - elis = new BookKeeperEditLogInputStream(h, l); - elis.skipTo(fromTxId); - } else { - // If mismatches then there might be some gap, so we should not check - // further. - return; - } - streams.add(elis); - if (elis.getLastTxId() == HdfsServerConstants.INVALID_TXID) { - return; - } - fromTxId = elis.getLastTxId() + 1; - } - } catch (BKException e) { - throw new IOException("Could not open ledger for " + fromTxId, e); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted opening ledger for " + fromTxId, ie); - } - } - - long getNumberOfTransactions(long fromTxId, boolean inProgressOk) - throws IOException { - long count = 0; - long expectedStart = 0; - for (EditLogLedgerMetadata l : getLedgerList(inProgressOk)) { - long lastTxId = l.getLastTxId(); - if (l.isInProgress()) { - lastTxId = recoverLastTxId(l, false); - if (lastTxId == HdfsServerConstants.INVALID_TXID) { - break; - } - } - - assert lastTxId >= l.getFirstTxId(); - - if (lastTxId < fromTxId) { - continue; - } else if (l.getFirstTxId() <= fromTxId && lastTxId >= fromTxId) { - // we can start in the middle of a segment - count = (lastTxId - l.getFirstTxId()) + 1; - expectedStart = lastTxId + 1; - } else { - if (expectedStart != l.getFirstTxId()) { - if (count == 0) { - throw new CorruptionException("StartTxId " + l.getFirstTxId() - + " is not as expected " + expectedStart - + ". Gap in transaction log?"); - } else { - break; - } - } - count += (lastTxId - l.getFirstTxId()) + 1; - expectedStart = lastTxId + 1; - } - } - return count; - } - - @Override - public void recoverUnfinalizedSegments() throws IOException { - checkEnv(); - - synchronized (this) { - try { - List children = zkc.getChildren(ledgerPath, false); - for (String child : children) { - if (!child.startsWith(BKJM_EDIT_INPROGRESS)) { - continue; - } - String znode = ledgerPath + "/" + child; - EditLogLedgerMetadata l = EditLogLedgerMetadata.read(zkc, znode); - try { - long endTxId = recoverLastTxId(l, true); - if (endTxId == HdfsServerConstants.INVALID_TXID) { - LOG.error("Unrecoverable corruption has occurred in segment " - + l.toString() + " at path " + znode - + ". Unable to continue recovery."); - throw new IOException("Unrecoverable corruption," - + " please check logs."); - } - finalizeLogSegment(l.getFirstTxId(), endTxId); - } catch (SegmentEmptyException see) { - LOG.warn("Inprogress znode " + child - + " refers to a ledger which is empty. This occurs when the NN" - + " crashes after opening a segment, but before writing the" - + " OP_START_LOG_SEGMENT op. It is safe to delete." - + " MetaData [" + l.toString() + "]"); - - // If the max seen transaction is the same as what would - // have been the first transaction of the failed ledger, - // decrement it, as that transaction never happened and as - // such, is _not_ the last seen - if (maxTxId.get() == l.getFirstTxId()) { - maxTxId.reset(maxTxId.get() - 1); - } - - zkc.delete(znode, -1); - } - } - } catch (KeeperException.NoNodeException nne) { - // nothing to recover, ignore - } catch (KeeperException ke) { - throw new IOException("Couldn't get list of inprogress segments", ke); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted getting list of inprogress segments", - ie); - } - } - } - - @Override - public void purgeLogsOlderThan(long minTxIdToKeep) - throws IOException { - checkEnv(); - - for (EditLogLedgerMetadata l : getLedgerList(false)) { - if (l.getLastTxId() < minTxIdToKeep) { - try { - Stat stat = zkc.exists(l.getZkPath(), false); - zkc.delete(l.getZkPath(), stat.getVersion()); - bkc.deleteLedger(l.getLedgerId()); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - LOG.error("Interrupted while purging " + l, ie); - } catch (BKException bke) { - LOG.error("Couldn't delete ledger from bookkeeper", bke); - } catch (KeeperException ke) { - LOG.error("Error deleting ledger entry in zookeeper", ke); - } - } - } - } - - @Override - public void doPreUpgrade() throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void doUpgrade(Storage storage) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public long getJournalCTime() throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void doFinalize() throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage, - int targetLayoutVersion) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void doRollback() throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void discardSegments(long startTxId) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void close() throws IOException { - try { - bkc.close(); - zkc.close(); - } catch (BKException bke) { - throw new IOException("Couldn't close bookkeeper client", bke); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted while closing journal manager", ie); - } - } - - /** - * Set the amount of memory that this stream should use to buffer edits. - * Setting this will only affect future output stream. Streams - * which have currently be created won't be affected. - */ - @Override - public void setOutputBufferCapacity(int size) { - conf.getInt(BKJM_OUTPUT_BUFFER_SIZE, size); - } - - /** - * Find the id of the last edit log transaction writen to a edit log - * ledger. - */ - private long recoverLastTxId(EditLogLedgerMetadata l, boolean fence) - throws IOException, SegmentEmptyException { - LedgerHandle lh = null; - try { - if (fence) { - lh = bkc.openLedger(l.getLedgerId(), - BookKeeper.DigestType.MAC, - digestpw.getBytes(Charsets.UTF_8)); - } else { - lh = bkc.openLedgerNoRecovery(l.getLedgerId(), - BookKeeper.DigestType.MAC, - digestpw.getBytes(Charsets.UTF_8)); - } - } catch (BKException bke) { - throw new IOException("Exception opening ledger for " + l, bke); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted opening ledger for " + l, ie); - } - - BookKeeperEditLogInputStream in = null; - - try { - long lastAddConfirmed = lh.getLastAddConfirmed(); - if (lastAddConfirmed == -1) { - throw new SegmentEmptyException(); - } - - in = new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed); - - long endTxId = HdfsServerConstants.INVALID_TXID; - FSEditLogOp op = in.readOp(); - while (op != null) { - if (endTxId == HdfsServerConstants.INVALID_TXID - || op.getTransactionId() == endTxId+1) { - endTxId = op.getTransactionId(); - } - op = in.readOp(); - } - return endTxId; - } finally { - if (in != null) { - in.close(); - } - } - } - - /** - * Get a list of all segments in the journal. - */ - List getLedgerList(boolean inProgressOk) - throws IOException { - return getLedgerList(-1, inProgressOk); - } - - private List getLedgerList(long fromTxId, - boolean inProgressOk) throws IOException { - List ledgers - = new ArrayList(); - try { - List ledgerNames = zkc.getChildren(ledgerPath, false); - for (String ledgerName : ledgerNames) { - if (!inProgressOk && ledgerName.contains(BKJM_EDIT_INPROGRESS)) { - continue; - } - String legderMetadataPath = ledgerPath + "/" + ledgerName; - try { - EditLogLedgerMetadata editLogLedgerMetadata = EditLogLedgerMetadata - .read(zkc, legderMetadataPath); - if (editLogLedgerMetadata.getLastTxId() != HdfsServerConstants.INVALID_TXID - && editLogLedgerMetadata.getLastTxId() < fromTxId) { - // exclude already read closed edits, but include inprogress edits - // as this will be handled in caller - continue; - } - ledgers.add(editLogLedgerMetadata); - } catch (KeeperException.NoNodeException e) { - LOG.warn("ZNode: " + legderMetadataPath - + " might have finalized and deleted." - + " So ignoring NoNodeException."); - } - } - } catch (KeeperException e) { - throw new IOException("Exception reading ledger list from zk", e); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted getting list of ledgers from zk", ie); - } - - Collections.sort(ledgers, EditLogLedgerMetadata.COMPARATOR); - return ledgers; - } - - /** - * Get the znode path for a finalize ledger - */ - String finalizedLedgerZNode(long startTxId, long endTxId) { - return String.format("%s/edits_%018d_%018d", - ledgerPath, startTxId, endTxId); - } - - /** - * Get the znode path for the inprogressZNode - */ - String inprogressZNode(long startTxid) { - return ledgerPath + "/inprogress_" + Long.toString(startTxid, 16); - } - - @VisibleForTesting - void setZooKeeper(ZooKeeper zk) { - this.zkc = zk; - } - - /** - * Simple watcher to notify when zookeeper has connected - */ - private class ZkConnectionWatcher implements Watcher { - public void process(WatchedEvent event) { - if (Event.KeeperState.SyncConnected.equals(event.getState())) { - zkConnectLatch.countDown(); - } - } - } - - private static class SegmentEmptyException extends IOException { - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java deleted file mode 100644 index 32d65cb..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.contrib.bkjournal; - -import java.io.IOException; -import java.net.InetAddress; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.KeeperException.NodeExistsException; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.Stat; - -import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.CurrentInprogressProto; -import com.google.protobuf.TextFormat; -import static com.google.common.base.Charsets.UTF_8; - -/** - * Distributed write permission lock, using ZooKeeper. Read the version number - * and return the current inprogress node path available in CurrentInprogress - * path. If it exist, caller can treat that some other client already operating - * on it. Then caller can take action. If there is no inprogress node exist, - * then caller can treat that there is no client operating on it. Later same - * caller should update the his newly created inprogress node path. At this - * point, if some other activities done on this node, version number might - * change, so update will fail. So, this read, update api will ensure that there - * is only node can continue further after checking with CurrentInprogress. - */ - -class CurrentInprogress { - static final Log LOG = LogFactory.getLog(CurrentInprogress.class); - - private final ZooKeeper zkc; - private final String currentInprogressNode; - private volatile int versionNumberForPermission = -1; - private final String hostName = InetAddress.getLocalHost().toString(); - - CurrentInprogress(ZooKeeper zkc, String lockpath) throws IOException { - this.currentInprogressNode = lockpath; - this.zkc = zkc; - } - - void init() throws IOException { - try { - Stat isCurrentInprogressNodeExists = zkc.exists(currentInprogressNode, - false); - if (isCurrentInprogressNodeExists == null) { - try { - zkc.create(currentInprogressNode, null, Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } catch (NodeExistsException e) { - // Node might created by other process at the same time. Ignore it. - if (LOG.isDebugEnabled()) { - LOG.debug(currentInprogressNode + " already created by other process.", - e); - } - } - } - } catch (KeeperException e) { - throw new IOException("Exception accessing Zookeeper", e); - } catch (InterruptedException ie) { - throw new IOException("Interrupted accessing Zookeeper", ie); - } - } - - /** - * Update the path with prepending version number and hostname - * - * @param path - * - to be updated in zookeeper - * @throws IOException - */ - void update(String path) throws IOException { - CurrentInprogressProto.Builder builder = CurrentInprogressProto.newBuilder(); - builder.setPath(path).setHostname(hostName); - - String content = TextFormat.printToString(builder.build()); - - try { - zkc.setData(this.currentInprogressNode, content.getBytes(UTF_8), - this.versionNumberForPermission); - } catch (KeeperException e) { - throw new IOException("Exception when setting the data " - + "[" + content + "] to CurrentInprogress. ", e); - } catch (InterruptedException e) { - throw new IOException("Interrupted while setting the data " - + "[" + content + "] to CurrentInprogress", e); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Updated data[" + content + "] to CurrentInprogress"); - } - } - - /** - * Read the CurrentInprogress node data from Zookeeper and also get the znode - * version number. Return the 3rd field from the data. i.e saved path with - * #update api - * - * @return available inprogress node path. returns null if not available. - * @throws IOException - */ - String read() throws IOException { - Stat stat = new Stat(); - byte[] data = null; - try { - data = zkc.getData(this.currentInprogressNode, false, stat); - } catch (KeeperException e) { - throw new IOException("Exception while reading the data from " - + currentInprogressNode, e); - } catch (InterruptedException e) { - throw new IOException("Interrupted while reading data from " - + currentInprogressNode, e); - } - this.versionNumberForPermission = stat.getVersion(); - if (data != null) { - CurrentInprogressProto.Builder builder = CurrentInprogressProto.newBuilder(); - TextFormat.merge(new String(data, UTF_8), builder); - if (!builder.isInitialized()) { - throw new IOException("Invalid/Incomplete data in znode"); - } - return builder.build().getPath(); - } else { - LOG.debug("No data available in CurrentInprogress"); - } - return null; - } - - /** Clear the CurrentInprogress node data */ - void clear() throws IOException { - try { - zkc.setData(this.currentInprogressNode, null, versionNumberForPermission); - } catch (KeeperException e) { - throw new IOException( - "Exception when setting the data to CurrentInprogress node", e); - } catch (InterruptedException e) { - throw new IOException( - "Interrupted when setting the data to CurrentInprogress node", e); - } - LOG.debug("Cleared the data from CurrentInprogress"); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java deleted file mode 100644 index 2d1f8b9..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java +++ /dev/null @@ -1,217 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.contrib.bkjournal; - -import java.io.IOException; -import java.util.Comparator; - -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.KeeperException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.EditLogLedgerProto; -import com.google.protobuf.TextFormat; -import static com.google.common.base.Charsets.UTF_8; - -/** - * Utility class for storing the metadata associated - * with a single edit log segment, stored in a single ledger - */ -public class EditLogLedgerMetadata { - static final Log LOG = LogFactory.getLog(EditLogLedgerMetadata.class); - - private String zkPath; - private final int dataLayoutVersion; - private final long ledgerId; - private final long firstTxId; - private long lastTxId; - private boolean inprogress; - - public static final Comparator COMPARATOR - = new Comparator() { - public int compare(EditLogLedgerMetadata o1, - EditLogLedgerMetadata o2) { - if (o1.firstTxId < o2.firstTxId) { - return -1; - } else if (o1.firstTxId == o2.firstTxId) { - return 0; - } else { - return 1; - } - } - }; - - EditLogLedgerMetadata(String zkPath, int dataLayoutVersion, - long ledgerId, long firstTxId) { - this.zkPath = zkPath; - this.dataLayoutVersion = dataLayoutVersion; - this.ledgerId = ledgerId; - this.firstTxId = firstTxId; - this.lastTxId = HdfsServerConstants.INVALID_TXID; - this.inprogress = true; - } - - EditLogLedgerMetadata(String zkPath, int dataLayoutVersion, - long ledgerId, long firstTxId, - long lastTxId) { - this.zkPath = zkPath; - this.dataLayoutVersion = dataLayoutVersion; - this.ledgerId = ledgerId; - this.firstTxId = firstTxId; - this.lastTxId = lastTxId; - this.inprogress = false; - } - - String getZkPath() { - return zkPath; - } - - long getFirstTxId() { - return firstTxId; - } - - long getLastTxId() { - return lastTxId; - } - - long getLedgerId() { - return ledgerId; - } - - boolean isInProgress() { - return this.inprogress; - } - - int getDataLayoutVersion() { - return this.dataLayoutVersion; - } - - void finalizeLedger(long newLastTxId) { - assert this.lastTxId == HdfsServerConstants.INVALID_TXID; - this.lastTxId = newLastTxId; - this.inprogress = false; - } - - static EditLogLedgerMetadata read(ZooKeeper zkc, String path) - throws IOException, KeeperException.NoNodeException { - try { - byte[] data = zkc.getData(path, false, null); - - EditLogLedgerProto.Builder builder = EditLogLedgerProto.newBuilder(); - if (LOG.isDebugEnabled()) { - LOG.debug("Reading " + path + " data: " + new String(data, UTF_8)); - } - TextFormat.merge(new String(data, UTF_8), builder); - if (!builder.isInitialized()) { - throw new IOException("Invalid/Incomplete data in znode"); - } - EditLogLedgerProto ledger = builder.build(); - - int dataLayoutVersion = ledger.getDataLayoutVersion(); - long ledgerId = ledger.getLedgerId(); - long firstTxId = ledger.getFirstTxId(); - if (ledger.hasLastTxId()) { - long lastTxId = ledger.getLastTxId(); - return new EditLogLedgerMetadata(path, dataLayoutVersion, - ledgerId, firstTxId, lastTxId); - } else { - return new EditLogLedgerMetadata(path, dataLayoutVersion, - ledgerId, firstTxId); - } - } catch(KeeperException.NoNodeException nne) { - throw nne; - } catch(KeeperException ke) { - throw new IOException("Error reading from zookeeper", ke); - } catch (InterruptedException ie) { - throw new IOException("Interrupted reading from zookeeper", ie); - } - } - - void write(ZooKeeper zkc, String path) - throws IOException, KeeperException.NodeExistsException { - this.zkPath = path; - - EditLogLedgerProto.Builder builder = EditLogLedgerProto.newBuilder(); - builder.setDataLayoutVersion(dataLayoutVersion) - .setLedgerId(ledgerId).setFirstTxId(firstTxId); - - if (!inprogress) { - builder.setLastTxId(lastTxId); - } - try { - zkc.create(path, TextFormat.printToString(builder.build()).getBytes(UTF_8), - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } catch (KeeperException.NodeExistsException nee) { - throw nee; - } catch (KeeperException e) { - throw new IOException("Error creating ledger znode", e); - } catch (InterruptedException ie) { - throw new IOException("Interrupted creating ledger znode", ie); - } - } - - boolean verify(ZooKeeper zkc, String path) { - try { - EditLogLedgerMetadata other = read(zkc, path); - if (LOG.isTraceEnabled()) { - LOG.trace("Verifying " + this.toString() - + " against " + other); - } - return other.equals(this); - } catch (KeeperException e) { - LOG.error("Couldn't verify data in " + path, e); - return false; - } catch (IOException ie) { - LOG.error("Couldn't verify data in " + path, ie); - return false; - } - } - - public boolean equals(Object o) { - if (!(o instanceof EditLogLedgerMetadata)) { - return false; - } - EditLogLedgerMetadata ol = (EditLogLedgerMetadata)o; - return ledgerId == ol.ledgerId - && dataLayoutVersion == ol.dataLayoutVersion - && firstTxId == ol.firstTxId - && lastTxId == ol.lastTxId; - } - - public int hashCode() { - int hash = 1; - hash = hash * 31 + (int) ledgerId; - hash = hash * 31 + (int) firstTxId; - hash = hash * 31 + (int) lastTxId; - hash = hash * 31 + dataLayoutVersion; - return hash; - } - - public String toString() { - return "[LedgerId:"+ledgerId + - ", firstTxId:" + firstTxId + - ", lastTxId:" + lastTxId + - ", dataLayoutVersion:" + dataLayoutVersion + "]"; - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java deleted file mode 100644 index 5a2eefa..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.contrib.bkjournal; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.Stat; - -import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.MaxTxIdProto; -import com.google.protobuf.TextFormat; -import static com.google.common.base.Charsets.UTF_8; - -/** - * Utility class for storing and reading - * the max seen txid in zookeeper - */ -class MaxTxId { - static final Log LOG = LogFactory.getLog(MaxTxId.class); - - private final ZooKeeper zkc; - private final String path; - - private Stat currentStat; - - MaxTxId(ZooKeeper zkc, String path) { - this.zkc = zkc; - this.path = path; - } - - synchronized void store(long maxTxId) throws IOException { - long currentMax = get(); - if (currentMax < maxTxId) { - if (LOG.isTraceEnabled()) { - LOG.trace("Setting maxTxId to " + maxTxId); - } - reset(maxTxId); - } - } - - synchronized void reset(long maxTxId) throws IOException { - try { - MaxTxIdProto.Builder builder = MaxTxIdProto.newBuilder().setTxId(maxTxId); - - byte[] data = TextFormat.printToString(builder.build()).getBytes(UTF_8); - if (currentStat != null) { - currentStat = zkc.setData(path, data, currentStat - .getVersion()); - } else { - zkc.create(path, data, Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } - } catch (KeeperException e) { - throw new IOException("Error writing max tx id", e); - } catch (InterruptedException e) { - throw new IOException("Interrupted while writing max tx id", e); - } - } - - synchronized long get() throws IOException { - try { - currentStat = zkc.exists(path, false); - if (currentStat == null) { - return 0; - } else { - - byte[] bytes = zkc.getData(path, false, currentStat); - - MaxTxIdProto.Builder builder = MaxTxIdProto.newBuilder(); - TextFormat.merge(new String(bytes, UTF_8), builder); - if (!builder.isInitialized()) { - throw new IOException("Invalid/Incomplete data in znode"); - } - - return builder.build().getTxId(); - } - } catch (KeeperException e) { - throw new IOException("Error reading the max tx id from zk", e); - } catch (InterruptedException ie) { - throw new IOException("Interrupted while reading thr max tx id", ie); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/proto/bkjournal.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/proto/bkjournal.proto b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/proto/bkjournal.proto deleted file mode 100644 index 15fa479..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/proto/bkjournal.proto +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// This file contains protocol buffers that are used by bkjournal -// mostly for storing data in zookeeper - -option java_package = "org.apache.hadoop.contrib.bkjournal"; -option java_outer_classname = "BKJournalProtos"; -option java_generate_equals_and_hash = true; -package hadoop.hdfs; - -import "hdfs.proto"; -import "HdfsServer.proto"; - -message VersionProto { - required int32 layoutVersion = 1; - optional NamespaceInfoProto namespaceInfo = 2; -} - -message EditLogLedgerProto { - required int32 dataLayoutVersion = 1; - required int64 ledgerId = 2; - required int64 firstTxId = 3; - optional int64 lastTxId = 4; -} - -message MaxTxIdProto { - required int64 txId = 1; -} - -message CurrentInprogressProto { - required string path = 1; - optional string hostname = 2; -} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org