hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dran...@apache.org
Subject [29/50] [abbrv] hadoop git commit: HDFS-10957. Retire BKJM from trunk (Vinayakumar B)
Date Sat, 08 Oct 2016 06:10:25 GMT
HDFS-10957. Retire BKJM from trunk (Vinayakumar B)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/31195488
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/31195488
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/31195488

Branch: refs/heads/HADOOP-12756
Commit: 311954883f714973784432589896553eb320b597
Parents: 35b9d7d
Author: Vinayakumar B <vinayakumarb@apache.org>
Authored: Thu Oct 6 19:28:25 2016 +0530
Committer: Vinayakumar B <vinayakumarb@apache.org>
Committed: Thu Oct 6 19:28:25 2016 +0530

----------------------------------------------------------------------
 .../src/contrib/bkjournal/README.txt            |  66 --
 .../dev-support/findbugsExcludeFile.xml         |   5 -
 .../hadoop-hdfs/src/contrib/bkjournal/pom.xml   | 175 ----
 .../bkjournal/BookKeeperEditLogInputStream.java | 264 -----
 .../BookKeeperEditLogOutputStream.java          | 188 ----
 .../bkjournal/BookKeeperJournalManager.java     | 893 -----------------
 .../contrib/bkjournal/CurrentInprogress.java    | 160 ---
 .../bkjournal/EditLogLedgerMetadata.java        | 217 ----
 .../hadoop/contrib/bkjournal/MaxTxId.java       | 103 --
 .../bkjournal/src/main/proto/bkjournal.proto    |  49 -
 .../hadoop/contrib/bkjournal/BKJMUtil.java      | 184 ----
 .../bkjournal/TestBookKeeperAsHASharedDir.java  | 414 --------
 .../bkjournal/TestBookKeeperConfiguration.java  | 174 ----
 .../bkjournal/TestBookKeeperEditLogStreams.java |  92 --
 .../bkjournal/TestBookKeeperHACheckpoints.java  | 109 --
 .../bkjournal/TestBookKeeperJournalManager.java | 984 -------------------
 .../TestBookKeeperSpeculativeRead.java          | 167 ----
 .../bkjournal/TestBootstrapStandbyWithBKJM.java | 170 ----
 .../bkjournal/TestCurrentInprogress.java        | 160 ---
 .../hdfs/server/namenode/FSEditLogTestUtil.java |  40 -
 .../src/test/resources/log4j.properties         |  55 --
 .../markdown/HDFSHighAvailabilityWithNFS.md     | 114 ---
 hadoop-hdfs-project/pom.xml                     |   1 -
 hadoop-project/pom.xml                          |   6 -
 24 files changed, 4790 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt
deleted file mode 100644
index 7f67226..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/README.txt
+++ /dev/null
@@ -1,66 +0,0 @@
-This module provides a BookKeeper backend for HFDS Namenode write
-ahead logging.  
-
-BookKeeper is a highly available distributed write ahead logging
-system. For more details, see
-   
-    http://zookeeper.apache.org/bookkeeper
-
--------------------------------------------------------------------------------
-How do I build?
-
- To generate the distribution packages for BK journal, do the
- following.
-
-   $ mvn clean package -Pdist
-
- This will generate a jar with all the dependencies needed by the journal
- manager, 
-
- target/hadoop-hdfs-bkjournal-<VERSION>.jar
-
- Note that the -Pdist part of the build command is important, as otherwise
- the dependencies would not be packaged in the jar. 
-
--------------------------------------------------------------------------------
-How do I use the BookKeeper Journal?
-
- To run a HDFS namenode using BookKeeper as a backend, copy the bkjournal
- jar, generated above, into the lib directory of hdfs. In the standard 
- distribution of HDFS, this is at $HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/
-
-  cp target/hadoop-hdfs-bkjournal-<VERSION>.jar \
-    $HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/
-
- Then, in hdfs-site.xml, set the following properties.
-
-   <property>
-     <name>dfs.namenode.edits.dir</name>
-     <value>bookkeeper://localhost:2181/bkjournal,file:///path/for/edits</value>
-   </property>
-
-   <property>
-     <name>dfs.namenode.edits.journal-plugin.bookkeeper</name>
-     <value>org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager</value>
-   </property>
-
- In this example, the namenode is configured to use 2 write ahead
- logging devices. One writes to BookKeeper and the other to a local
- file system. At the moment is is not possible to only write to 
- BookKeeper, as the resource checker explicitly checked for local
- disks currently.
-
- The given example, configures the namenode to look for the journal
- metadata at the path /bkjournal on the a standalone zookeeper ensemble
- at localhost:2181. To configure a multiple host zookeeper ensemble,
- separate the hosts with semicolons. For example, if you have 3
- zookeeper servers, zk1, zk2 & zk3, each listening on port 2181, you
- would specify this with 
-  
-   bookkeeper://zk1:2181;zk2:2181;zk3:2181/bkjournal
-
- The final part /bkjournal specifies the znode in zookeeper where
- ledger metadata will be store. Administrators can set this to anything
- they wish.
-
-

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/dev-support/findbugsExcludeFile.xml
deleted file mode 100644
index 45c3a75..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/dev-support/findbugsExcludeFile.xml
+++ /dev/null
@@ -1,5 +0,0 @@
-<FindBugsFilter>
-     <Match>
-       <Class name="~org.apache.hadoop.contrib.bkjournal.BKJournalProtos.*" />
-     </Match>
-</FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml
deleted file mode 100644
index 7fb6c24..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml
+++ /dev/null
@@ -1,175 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License. See accompanying LICENSE file.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
-http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.hadoop</groupId>
-    <artifactId>hadoop-project</artifactId>
-    <version>3.0.0-alpha2-SNAPSHOT</version>
-    <relativePath>../../../../../hadoop-project</relativePath>
-  </parent>
-
-  <groupId>org.apache.hadoop.contrib</groupId>
-  <artifactId>hadoop-hdfs-bkjournal</artifactId>
-  <version>3.0.0-alpha2-SNAPSHOT</version>
-  <description>Apache Hadoop HDFS BookKeeper Journal</description>
-  <name>Apache Hadoop HDFS BookKeeper Journal</name>
-  <packaging>jar</packaging>
-
-  <properties>
-    <hadoop.component>hdfs</hadoop.component>
-    <hadoop.common.build.dir>${basedir}/../../../../../hadoop-common-project/hadoop-common/target</hadoop.common.build.dir>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>commons-logging</groupId>
-      <artifactId>commons-logging</artifactId>
-      <scope>compile</scope>
-    </dependency>
-    <dependency> 
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency> 
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-hdfs</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency> 
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-hdfs</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.bookkeeper</groupId>
-      <artifactId>bookkeeper-server</artifactId>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.zookeeper</groupId>
-      <artifactId>zookeeper</artifactId>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-all</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-maven-plugins</artifactId>
-        <executions>
-          <execution>
-            <id>compile-protoc</id>
-            <phase>generate-sources</phase>
-            <goals>
-              <goal>protoc</goal>
-            </goals>
-            <configuration>
-              <protocVersion>${protobuf.version}</protocVersion>
-              <protocCommand>${protoc.path}</protocCommand>
-              <imports>
-                <param>${basedir}/../../../../../hadoop-common-project/hadoop-common/src/main/proto</param>
-                <param>${basedir}/../../../../../hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto</param>
-                <param>${basedir}/../../../../../hadoop-hdfs-project/hadoop-hdfs/src/main/proto</param>
-                <param>${basedir}/src/main/proto</param>
-              </imports>
-              <source>
-                <directory>${basedir}/src/main/proto</directory>
-                <includes>
-                  <include>bkjournal.proto</include>
-                </includes>
-              </source>
-              <output>${project.build.directory}/generated-sources/java</output>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>findbugs-maven-plugin</artifactId>
-        <configuration>
-          <excludeFilterFile>${basedir}/dev-support/findbugsExcludeFile.xml</excludeFilterFile>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.rat</groupId>
-        <artifactId>apache-rat-plugin</artifactId>
-        <configuration>
-          <excludes>
-            <exclude>dev-support/findbugsExcludeFile.xml</exclude>
-          </excludes>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
-  <profiles>
-    <profile>
-      <id>dist</id>
-      <build>
-        <plugins>
-          <plugin>
-            <artifactId>maven-dependency-plugin</artifactId>
-            <version>2.8</version>
-            <executions>
-              <execution>
-                <id>dist</id>
-                <phase>package</phase>
-                <goals>
-                  <goal>copy</goal>
-                </goals>
-                <configuration>
-                  <artifactItems>
-                    <artifactItem>
-                      <groupId>org.apache.bookkeeper</groupId>
-                      <artifactId>bookkeeper-server</artifactId>
-                      <type>jar</type>
-                    </artifactItem>
-                  </artifactItems>
-                  <outputDirectory>${project.build.directory}/lib</outputDirectory>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
-  </profiles>
-</project>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
deleted file mode 100644
index 86da807..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Enumeration;
-
-import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.LedgerEntry;
-import org.apache.bookkeeper.client.BKException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * Input stream which reads from a BookKeeper ledger.
- */
-class BookKeeperEditLogInputStream extends EditLogInputStream {
-  static final Log LOG = LogFactory.getLog(BookKeeperEditLogInputStream.class);
-
-  private final long firstTxId;
-  private final long lastTxId;
-  private final int logVersion;
-  private final boolean inProgress;
-  private final LedgerHandle lh;
-
-  private final FSEditLogOp.Reader reader;
-  private final FSEditLogLoader.PositionTrackingInputStream tracker;
-
-  /**
-   * Construct BookKeeper edit log input stream.
-   * Starts reading from the first entry of the ledger.
-   */
-  BookKeeperEditLogInputStream(final LedgerHandle lh, 
-                               final EditLogLedgerMetadata metadata)
-      throws IOException {
-    this(lh, metadata, 0);
-  }
-
-  /**
-   * Construct BookKeeper edit log input stream. 
-   * Starts reading from firstBookKeeperEntry. This allows the stream
-   * to take a shortcut during recovery, as it doesn't have to read
-   * every edit log transaction to find out what the last one is.
-   */
-  BookKeeperEditLogInputStream(LedgerHandle lh, EditLogLedgerMetadata metadata,
-                               long firstBookKeeperEntry) 
-      throws IOException {
-    this.lh = lh;
-    this.firstTxId = metadata.getFirstTxId();
-    this.lastTxId = metadata.getLastTxId();
-    this.logVersion = metadata.getDataLayoutVersion();
-    this.inProgress = metadata.isInProgress();
-
-    if (firstBookKeeperEntry < 0
-        || firstBookKeeperEntry > lh.getLastAddConfirmed()) {
-      throw new IOException("Invalid first bk entry to read: "
-          + firstBookKeeperEntry + ", LAC: " + lh.getLastAddConfirmed());
-    }
-    BufferedInputStream bin = new BufferedInputStream(
-        new LedgerInputStream(lh, firstBookKeeperEntry));
-    tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
-    DataInputStream in = new DataInputStream(tracker);
-
-    reader = FSEditLogOp.Reader.create(in, tracker, logVersion);
-  }
-
-  @Override
-  public long getFirstTxId() {
-    return firstTxId;
-  }
-
-  @Override
-  public long getLastTxId() {
-    return lastTxId;
-  }
-  
-  @Override
-  public int getVersion(boolean verifyVersion) throws IOException {
-    return logVersion;
-  }
-
-  @Override
-  protected FSEditLogOp nextOp() throws IOException {
-    return reader.readOp(false);
-  }
-
-  @Override
-  public void close() throws IOException {
-    try {
-      lh.close();
-    } catch (BKException e) {
-      throw new IOException("Exception closing ledger", e);
-    } catch (InterruptedException e) {
-      throw new IOException("Interrupted closing ledger", e);
-    }
-  }
-
-  @Override
-  public long getPosition() {
-    return tracker.getPos();
-  }
-
-  @Override
-  public long length() throws IOException {
-    return lh.getLength();
-  }
-  
-  @Override
-  public String getName() {
-    return String.format(
-        "BookKeeperLedger[ledgerId=%d,firstTxId=%d,lastTxId=%d]", lh.getId(),
-        firstTxId, lastTxId);
-  }
-
-  @Override
-  public boolean isInProgress() {
-    return inProgress;
-  }
-
-  /**
-   * Skip forward to specified transaction id.
-   * Currently we do this by just iterating forward.
-   * If this proves to be too expensive, this can be reimplemented
-   * with a binary search over bk entries
-   */
-  public void skipTo(long txId) throws IOException {
-    long numToSkip = getFirstTxId() - txId;
-
-    FSEditLogOp op = null;
-    for (long i = 0; i < numToSkip; i++) {
-      op = readOp();
-    }
-    if (op != null && op.getTransactionId() != txId-1) {
-      throw new IOException("Corrupt stream, expected txid "
-          + (txId-1) + ", got " + op.getTransactionId());
-    }
-  }
-
-  @Override
-  public String toString() {
-    return ("BookKeeperEditLogInputStream {" + this.getName() + "}");
-  }
-
-  @Override
-  public void setMaxOpSize(int maxOpSize) {
-    reader.setMaxOpSize(maxOpSize);
-  }
-
-  @Override
-  public boolean isLocalLog() {
-    return false;
-  }
-
-  /**
-   * Input stream implementation which can be used by 
-   * FSEditLogOp.Reader
-   */
-  private static class LedgerInputStream extends InputStream {
-    private long readEntries;
-    private InputStream entryStream = null;
-    private final LedgerHandle lh;
-    private final long maxEntry;
-
-    /**
-     * Construct ledger input stream
-     * @param lh the ledger handle to read from
-     * @param firstBookKeeperEntry ledger entry to start reading from
-     */
-    LedgerInputStream(LedgerHandle lh, long firstBookKeeperEntry) 
-        throws IOException {
-      this.lh = lh;
-      readEntries = firstBookKeeperEntry;
-
-      maxEntry = lh.getLastAddConfirmed();
-    }
-
-    /**
-     * Get input stream representing next entry in the
-     * ledger.
-     * @return input stream, or null if no more entries
-     */
-    private InputStream nextStream() throws IOException {
-      try {        
-        if (readEntries > maxEntry) {
-          return null;
-        }
-        Enumeration<LedgerEntry> entries 
-          = lh.readEntries(readEntries, readEntries);
-        readEntries++;
-        if (entries.hasMoreElements()) {
-            LedgerEntry e = entries.nextElement();
-            assert !entries.hasMoreElements();
-            return e.getEntryInputStream();
-        }
-      } catch (BKException e) {
-        throw new IOException("Error reading entries from bookkeeper", e);
-      } catch (InterruptedException e) {
-        throw new IOException("Interrupted reading entries from bookkeeper", e);
-      }
-      return null;
-    }
-
-    @Override
-    public int read() throws IOException {
-      byte[] b = new byte[1];
-      if (read(b, 0, 1) != 1) {
-        return -1;
-      } else {
-        return b[0];
-      }
-    }
-    
-    @Override
-    public int read(byte[] b, int off, int len) throws IOException {
-      try {
-        int read = 0;
-        if (entryStream == null) {
-          entryStream = nextStream();
-          if (entryStream == null) {
-            return read;
-          }
-        }
-
-        while (read < len) {
-          int thisread = entryStream.read(b, off+read, (len-read));
-          if (thisread == -1) {
-            entryStream = nextStream();
-            if (entryStream == null) {
-              return read;
-            }
-          } else {
-            read += thisread;
-          }
-        }
-        return read;
-      } catch (IOException e) {
-        throw e;
-      }
-
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java
deleted file mode 100644
index 865806b..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.CountDownLatch;
-
-import java.util.Arrays;
-
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer;
-
-import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
-import org.apache.hadoop.io.DataOutputBuffer;
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * Output stream for BookKeeper Journal.
- * Multiple complete edit log entries are packed into a single bookkeeper
- * entry before sending it over the network. The fact that the edit log entries
- * are complete in the bookkeeper entries means that each bookkeeper log entry
- *can be read as a complete edit log. This is useful for recover, as we don't
- * need to read through the entire edit log segment to get the last written
- * entry.
- */
-class BookKeeperEditLogOutputStream
-  extends EditLogOutputStream implements AddCallback {
-  static final Log LOG = LogFactory.getLog(BookKeeperEditLogOutputStream.class);
-
-  private final DataOutputBuffer bufCurrent;
-  private final AtomicInteger outstandingRequests;
-  private final int transmissionThreshold;
-  private final LedgerHandle lh;
-  private CountDownLatch syncLatch;
-  private final AtomicInteger transmitResult
-    = new AtomicInteger(BKException.Code.OK);
-  private final Writer writer;
-
-  /**
-   * Construct an edit log output stream which writes to a ledger.
-
-   */
-  protected BookKeeperEditLogOutputStream(Configuration conf, LedgerHandle lh)
-      throws IOException {
-    super();
-
-    bufCurrent = new DataOutputBuffer();
-    outstandingRequests = new AtomicInteger(0);
-    syncLatch = null;
-    this.lh = lh;
-    this.writer = new Writer(bufCurrent);
-    this.transmissionThreshold
-      = conf.getInt(BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE,
-                    BookKeeperJournalManager.BKJM_OUTPUT_BUFFER_SIZE_DEFAULT);
-  }
-
-  @Override
-  public void create(int layoutVersion) throws IOException {
-    // noop
-  }
-
-  @Override
-  public void close() throws IOException {
-    setReadyToFlush();
-    flushAndSync(true);
-    try {
-      lh.close();
-    } catch (InterruptedException ie) {
-      throw new IOException("Interrupted waiting on close", ie);
-    } catch (BKException bke) {
-      throw new IOException("BookKeeper error during close", bke);
-    }
-  }
-
-  @Override
-  public void abort() throws IOException {
-    try {
-      lh.close();
-    } catch (InterruptedException ie) {
-      throw new IOException("Interrupted waiting on close", ie);
-    } catch (BKException bke) {
-      throw new IOException("BookKeeper error during abort", bke);
-    }
-
-  }
-
-  @Override
-  public void writeRaw(final byte[] data, int off, int len) throws IOException {
-    throw new IOException("Not supported for BK");
-  }
-
-  @Override
-  public void write(FSEditLogOp op) throws IOException {
-    writer.writeOp(op);
-
-    if (bufCurrent.getLength() > transmissionThreshold) {
-      transmit();
-    }
-  }
-
-  @Override
-  public void setReadyToFlush() throws IOException {
-    transmit();
-
-    synchronized (this) {
-      syncLatch = new CountDownLatch(outstandingRequests.get());
-    }
-  }
-
-  @Override
-  public void flushAndSync(boolean durable) throws IOException {
-    assert(syncLatch != null);
-    try {
-      syncLatch.await();
-    } catch (InterruptedException ie) {
-      throw new IOException("Interrupted waiting on latch", ie);
-    }
-    if (transmitResult.get() != BKException.Code.OK) {
-      throw new IOException("Failed to write to bookkeeper; Error is ("
-                            + transmitResult.get() + ") "
-                            + BKException.getMessage(transmitResult.get()));
-    }
-
-    syncLatch = null;
-    // wait for whatever we wait on
-  }
-
-  /**
-   * Transmit the current buffer to bookkeeper.
-   * Synchronised at the FSEditLog level. #write() and #setReadyToFlush()
-   * are never called at the same time.
-   */
-  private void transmit() throws IOException {
-    if (!transmitResult.compareAndSet(BKException.Code.OK,
-                                     BKException.Code.OK)) {
-      throw new IOException("Trying to write to an errored stream;"
-          + " Error code : (" + transmitResult.get()
-          + ") " + BKException.getMessage(transmitResult.get()));
-    }
-    if (bufCurrent.getLength() > 0) {
-      byte[] entry = Arrays.copyOf(bufCurrent.getData(),
-                                   bufCurrent.getLength());
-      lh.asyncAddEntry(entry, this, null);
-      bufCurrent.reset();
-      outstandingRequests.incrementAndGet();
-    }
-  }
-
-  @Override
-  public void addComplete(int rc, LedgerHandle handle,
-                          long entryId, Object ctx) {
-    synchronized(this) {
-      outstandingRequests.decrementAndGet();
-      if (!transmitResult.compareAndSet(BKException.Code.OK, rc)) {
-        LOG.warn("Tried to set transmit result to (" + rc + ") \""
-            + BKException.getMessage(rc) + "\""
-            + " but is already (" + transmitResult.get() + ") \""
-            + BKException.getMessage(transmitResult.get()) + "\"");
-      }
-      CountDownLatch l = syncLatch;
-      if (l != null) {
-        l.countDown();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
deleted file mode 100644
index 8e4d032..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
+++ /dev/null
@@ -1,893 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.server.common.Storage;
-import org.apache.hadoop.hdfs.server.common.StorageInfo;
-import org.apache.hadoop.hdfs.server.namenode.JournalManager;
-import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
-import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
-import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.util.ZkUtils;
-
-import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.AsyncCallback.StringCallback;
-import org.apache.zookeeper.ZKUtil;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.io.IOException;
-
-import java.net.URI;
-
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
-import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.VersionProto;
-import com.google.protobuf.TextFormat;
-import static com.google.common.base.Charsets.UTF_8;
-
-import org.apache.commons.io.Charsets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import com.google.common.annotations.VisibleForTesting;
-/**
- * BookKeeper Journal Manager
- *
- * To use, add the following to hdfs-site.xml.
- * <pre>
- * {@code
- * <property>
- *   <name>dfs.namenode.edits.dir</name>
- *   <value>bookkeeper://zk1:2181;zk2:2181;zk3:2181/hdfsjournal</value>
- * </property>
- *
- * <property>
- *   <name>dfs.namenode.edits.journal-plugin.bookkeeper</name>
- *   <value>org.apache.hadoop.contrib.bkjournal.BookKeeperJournalManager</value>
- * </property>
- * }
- * </pre>
- * The URI format for bookkeeper is bookkeeper://[zkEnsemble]/[rootZnode]
- * [zookkeeper ensemble] is a list of semi-colon separated, zookeeper host:port
- * pairs. In the example above there are 3 servers, in the ensemble,
- * zk1, zk2 &amp; zk3, each one listening on port 2181.
- *
- * [root znode] is the path of the zookeeper znode, under which the editlog
- * information will be stored.
- *
- * Other configuration options are:
- * <ul>
- *   <li><b>dfs.namenode.bookkeeperjournal.output-buffer-size</b>
- *       Number of bytes a bookkeeper journal stream will buffer before
- *       forcing a flush. Default is 1024.</li>
- *   <li><b>dfs.namenode.bookkeeperjournal.ensemble-size</b>
- *       Number of bookkeeper servers in edit log ledger ensembles. This
- *       is the number of bookkeeper servers which need to be available
- *       for the ledger to be writable. Default is 3.</li>
- *   <li><b>dfs.namenode.bookkeeperjournal.quorum-size</b>
- *       Number of bookkeeper servers in the write quorum. This is the
- *       number of bookkeeper servers which must have acknowledged the
- *       write of an entry before it is considered written.
- *       Default is 2.</li>
- *   <li><b>dfs.namenode.bookkeeperjournal.digestPw</b>
- *       Password to use when creating ledgers. </li>
- *   <li><b>dfs.namenode.bookkeeperjournal.zk.session.timeout</b>
- *       Session timeout for Zookeeper client from BookKeeper Journal Manager.
- *       Hadoop recommends that, this value should be less than the ZKFC 
- *       session timeout value. Default value is 3000.</li>
- * </ul>
- */
-public class BookKeeperJournalManager implements JournalManager {
-  static final Log LOG = LogFactory.getLog(BookKeeperJournalManager.class);
-
-  public static final String BKJM_OUTPUT_BUFFER_SIZE
-    = "dfs.namenode.bookkeeperjournal.output-buffer-size";
-  public static final int BKJM_OUTPUT_BUFFER_SIZE_DEFAULT = 1024;
-
-  public static final String BKJM_BOOKKEEPER_ENSEMBLE_SIZE
-    = "dfs.namenode.bookkeeperjournal.ensemble-size";
-  public static final int BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT = 3;
-
- public static final String BKJM_BOOKKEEPER_QUORUM_SIZE
-    = "dfs.namenode.bookkeeperjournal.quorum-size";
-  public static final int BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT = 2;
-
-  public static final String BKJM_BOOKKEEPER_DIGEST_PW
-    = "dfs.namenode.bookkeeperjournal.digestPw";
-  public static final String BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT = "";
-
-  private static final int BKJM_LAYOUT_VERSION = -1;
-  
-  public static final String BKJM_ZK_SESSION_TIMEOUT 
-    = "dfs.namenode.bookkeeperjournal.zk.session.timeout";
-  public static final int BKJM_ZK_SESSION_TIMEOUT_DEFAULT = 3000;
-
-  private static final String BKJM_EDIT_INPROGRESS = "inprogress_";
-
-  public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH
-    = "dfs.namenode.bookkeeperjournal.zk.availablebookies";
-
-  public static final String BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT
-    = "/ledgers/available";
-
-  public static final String BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS
-    = "dfs.namenode.bookkeeperjournal.speculativeReadTimeoutMs";
-  public static final int BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT
-    = 2000;
-
-  public static final String BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC
-    = "dfs.namenode.bookkeeperjournal.readEntryTimeoutSec";
-  public static final int BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT = 5;
-
-  public static final String BKJM_BOOKKEEPER_ACK_QUORUM_SIZE 
-    = "dfs.namenode.bookkeeperjournal.ack.quorum-size";
-
-  public static final String BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC
-    = "dfs.namenode.bookkeeperjournal.addEntryTimeoutSec";
-  public static final int BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_DEFAULT = 5;
-
-  private ZooKeeper zkc;
-  private final Configuration conf;
-  private final BookKeeper bkc;
-  private final CurrentInprogress ci;
-  private final String basePath;
-  private final String ledgerPath;
-  private final String versionPath;
-  private final MaxTxId maxTxId;
-  private final int ensembleSize;
-  private final int quorumSize;
-  private final int ackQuorumSize;
-  private final int addEntryTimeout;
-  private final String digestpw;
-  private final int speculativeReadTimeout;
-  private final int readEntryTimeout;
-  private final CountDownLatch zkConnectLatch;
-  private final NamespaceInfo nsInfo;
-  private boolean initialized = false;
-  private LedgerHandle currentLedger = null;
-
-  /**
-   * Construct a Bookkeeper journal manager.
-   */
-  public BookKeeperJournalManager(Configuration conf, URI uri,
-      NamespaceInfo nsInfo) throws IOException {
-    this.conf = conf;
-    this.nsInfo = nsInfo;
-
-    String zkConnect = uri.getAuthority().replace(";", ",");
-    basePath = uri.getPath();
-    ensembleSize = conf.getInt(BKJM_BOOKKEEPER_ENSEMBLE_SIZE,
-                               BKJM_BOOKKEEPER_ENSEMBLE_SIZE_DEFAULT);
-    quorumSize = conf.getInt(BKJM_BOOKKEEPER_QUORUM_SIZE,
-                             BKJM_BOOKKEEPER_QUORUM_SIZE_DEFAULT);
-    ackQuorumSize = conf.getInt(BKJM_BOOKKEEPER_ACK_QUORUM_SIZE, quorumSize);
-    addEntryTimeout = conf.getInt(BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_SEC,
-                             BKJM_BOOKKEEPER_ADD_ENTRY_TIMEOUT_DEFAULT);
-    speculativeReadTimeout = conf.getInt(
-                             BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_MS,
-                             BKJM_BOOKKEEPER_SPECULATIVE_READ_TIMEOUT_DEFAULT);
-    readEntryTimeout = conf.getInt(BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_SEC,
-                             BKJM_BOOKKEEPER_READ_ENTRY_TIMEOUT_DEFAULT);
-
-    ledgerPath = basePath + "/ledgers";
-    String maxTxIdPath = basePath + "/maxtxid";
-    String currentInprogressNodePath = basePath + "/CurrentInprogress";
-    versionPath = basePath + "/version";
-    digestpw = conf.get(BKJM_BOOKKEEPER_DIGEST_PW,
-                        BKJM_BOOKKEEPER_DIGEST_PW_DEFAULT);
-
-    try {
-      zkConnectLatch = new CountDownLatch(1);
-      int bkjmZKSessionTimeout = conf.getInt(BKJM_ZK_SESSION_TIMEOUT,
-          BKJM_ZK_SESSION_TIMEOUT_DEFAULT);
-      zkc = new ZooKeeper(zkConnect, bkjmZKSessionTimeout,
-          new ZkConnectionWatcher());
-      // Configured zk session timeout + some extra grace period (here
-      // BKJM_ZK_SESSION_TIMEOUT_DEFAULT used as grace period)
-      int zkConnectionLatchTimeout = bkjmZKSessionTimeout
-          + BKJM_ZK_SESSION_TIMEOUT_DEFAULT;
-      if (!zkConnectLatch
-          .await(zkConnectionLatchTimeout, TimeUnit.MILLISECONDS)) {
-        throw new IOException("Error connecting to zookeeper");
-      }
-
-      prepareBookKeeperEnv();
-      ClientConfiguration clientConf = new ClientConfiguration();
-      clientConf.setSpeculativeReadTimeout(speculativeReadTimeout);
-      clientConf.setReadEntryTimeout(readEntryTimeout);
-      clientConf.setAddEntryTimeout(addEntryTimeout);
-      bkc = new BookKeeper(clientConf, zkc);
-    } catch (KeeperException e) {
-      throw new IOException("Error initializing zk", e);
-    } catch (InterruptedException ie) {
-      Thread.currentThread().interrupt();
-      throw new IOException("Interrupted while initializing bk journal manager",
-                            ie);
-    }
-
-    ci = new CurrentInprogress(zkc, currentInprogressNodePath);
-    maxTxId = new MaxTxId(zkc, maxTxIdPath);
-  }
-
-  /**
-   * Pre-creating bookkeeper metadata path in zookeeper.
-   */
-  private void prepareBookKeeperEnv() throws IOException {
-    // create bookie available path in zookeeper if it doesn't exists
-    final String zkAvailablePath = conf.get(BKJM_ZK_LEDGERS_AVAILABLE_PATH,
-        BKJM_ZK_LEDGERS_AVAILABLE_PATH_DEFAULT);
-    final CountDownLatch zkPathLatch = new CountDownLatch(1);
-
-    final AtomicBoolean success = new AtomicBoolean(false);
-    StringCallback callback = new StringCallback() {
-      @Override
-      public void processResult(int rc, String path, Object ctx, String name) {
-        if (KeeperException.Code.OK.intValue() == rc
-            || KeeperException.Code.NODEEXISTS.intValue() == rc) {
-          LOG.info("Successfully created bookie available path : "
-              + zkAvailablePath);
-          success.set(true);
-        } else {
-          KeeperException.Code code = KeeperException.Code.get(rc);
-          LOG.error("Error : "
-                  + KeeperException.create(code, path).getMessage()
-                  + ", failed to create bookie available path : "
-                  + zkAvailablePath);
-        }
-        zkPathLatch.countDown();
-      }
-    };
-    ZkUtils.asyncCreateFullPathOptimistic(zkc, zkAvailablePath, new byte[0],
-        Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, callback, null);
-
-    try {
-      if (!zkPathLatch.await(zkc.getSessionTimeout(), TimeUnit.MILLISECONDS)
-          || !success.get()) {
-        throw new IOException("Couldn't create bookie available path :"
-            + zkAvailablePath + ", timed out " + zkc.getSessionTimeout()
-            + " millis");
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new IOException(
-          "Interrupted when creating the bookie available path : "
-              + zkAvailablePath, e);
-    }
-  }
-
-  @Override
-  public void format(NamespaceInfo ns) throws IOException {
-    try {
-      // delete old info
-      Stat baseStat = null;
-      Stat ledgerStat = null;
-      if ((baseStat = zkc.exists(basePath, false)) != null) {
-        if ((ledgerStat = zkc.exists(ledgerPath, false)) != null) {
-          for (EditLogLedgerMetadata l : getLedgerList(true)) {
-            try {
-              bkc.deleteLedger(l.getLedgerId());
-            } catch (BKException.BKNoSuchLedgerExistsException bke) {
-              LOG.warn("Ledger " + l.getLedgerId() + " does not exist;"
-                       + " Cannot delete.");
-            }
-          }
-        }
-        ZKUtil.deleteRecursive(zkc, basePath);
-      }
-
-      // should be clean now.
-      zkc.create(basePath, new byte[] {'0'},
-          Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
-      VersionProto.Builder builder = VersionProto.newBuilder();
-      builder.setNamespaceInfo(PBHelper.convert(ns))
-        .setLayoutVersion(BKJM_LAYOUT_VERSION);
-
-      byte[] data = TextFormat.printToString(builder.build()).getBytes(UTF_8);
-      zkc.create(versionPath, data,
-                 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
-      zkc.create(ledgerPath, new byte[] {'0'},
-                 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-    } catch (KeeperException ke) {
-      LOG.error("Error accessing zookeeper to format", ke);
-      throw new IOException("Error accessing zookeeper to format", ke);
-    } catch (InterruptedException ie) {
-      Thread.currentThread().interrupt();
-      throw new IOException("Interrupted during format", ie);
-    } catch (BKException bke) {
-      throw new IOException("Error cleaning up ledgers during format", bke);
-    }
-  }
-  
-  @Override
-  public boolean hasSomeData() throws IOException {
-    try {
-      return zkc.exists(basePath, false) != null;
-    } catch (KeeperException ke) {
-      throw new IOException("Couldn't contact zookeeper", ke);
-    } catch (InterruptedException ie) {
-      Thread.currentThread().interrupt();
-      throw new IOException("Interrupted while checking for data", ie);
-    }
-  }
-
-  synchronized private void checkEnv() throws IOException {
-    if (!initialized) {
-      try {
-        Stat versionStat = zkc.exists(versionPath, false);
-        if (versionStat == null) {
-          throw new IOException("Environment not initialized. "
-                                +"Have you forgotten to format?");
-        }
-        byte[] d = zkc.getData(versionPath, false, versionStat);
-
-        VersionProto.Builder builder = VersionProto.newBuilder();
-        TextFormat.merge(new String(d, UTF_8), builder);
-        if (!builder.isInitialized()) {
-          throw new IOException("Invalid/Incomplete data in znode");
-        }
-        VersionProto vp = builder.build();
-
-        // There's only one version at the moment
-        assert vp.getLayoutVersion() == BKJM_LAYOUT_VERSION;
-
-        NamespaceInfo readns = PBHelper.convert(vp.getNamespaceInfo());
-
-        if (nsInfo.getNamespaceID() != readns.getNamespaceID() ||
-            !nsInfo.clusterID.equals(readns.getClusterID()) ||
-            !nsInfo.getBlockPoolID().equals(readns.getBlockPoolID())) {
-          String err = String.format("Environment mismatch. Running process %s"
-                                     +", stored in ZK %s", nsInfo, readns);
-          LOG.error(err);
-          throw new IOException(err);
-        }
-
-        ci.init();
-        initialized = true;
-      } catch (KeeperException ke) {
-        throw new IOException("Cannot access ZooKeeper", ke);
-      } catch (InterruptedException ie) {
-        Thread.currentThread().interrupt();
-        throw new IOException("Interrupted while checking environment", ie);
-      }
-    }
-  }
-
-  /**
-   * Start a new log segment in a BookKeeper ledger.
-   * First ensure that we have the write lock for this journal.
-   * Then create a ledger and stream based on that ledger.
-   * The ledger id is written to the inprogress znode, so that in the
-   * case of a crash, a recovery process can find the ledger we were writing
-   * to when we crashed.
-   * @param txId First transaction id to be written to the stream
-   */
-  @Override
-  public EditLogOutputStream startLogSegment(long txId, int layoutVersion)
-      throws IOException {
-    checkEnv();
-
-    if (txId <= maxTxId.get()) {
-      throw new IOException("We've already seen " + txId
-          + ". A new stream cannot be created with it");
-    }
-
-    try {
-      String existingInprogressNode = ci.read();
-      if (null != existingInprogressNode
-          && zkc.exists(existingInprogressNode, false) != null) {
-        throw new IOException("Inprogress node already exists");
-      }
-      if (currentLedger != null) {
-        // bookkeeper errored on last stream, clean up ledger
-        currentLedger.close();
-      }
-      currentLedger = bkc.createLedger(ensembleSize, quorumSize, ackQuorumSize,
-                                       BookKeeper.DigestType.MAC,
-                                       digestpw.getBytes(Charsets.UTF_8));
-    } catch (BKException bke) {
-      throw new IOException("Error creating ledger", bke);
-    } catch (KeeperException ke) {
-      throw new IOException("Error in zookeeper while creating ledger", ke);
-    } catch (InterruptedException ie) {
-      Thread.currentThread().interrupt();
-      throw new IOException("Interrupted creating ledger", ie);
-    }
-
-    try {
-      String znodePath = inprogressZNode(txId);
-      EditLogLedgerMetadata l = new EditLogLedgerMetadata(znodePath,
-          layoutVersion, currentLedger.getId(), txId);
-      /* Write the ledger metadata out to the inprogress ledger znode
-       * This can fail if for some reason our write lock has
-       * expired (@see WriteLock) and another process has managed to
-       * create the inprogress znode.
-       * In this case, throw an exception. We don't want to continue
-       * as this would lead to a split brain situation.
-       */
-      l.write(zkc, znodePath);
-
-      maxTxId.store(txId);
-      ci.update(znodePath);
-      return new BookKeeperEditLogOutputStream(conf, currentLedger);
-    } catch (KeeperException ke) {
-      cleanupLedger(currentLedger);
-      throw new IOException("Error storing ledger metadata", ke);
-    }
-  }
-
-  private void cleanupLedger(LedgerHandle lh) {
-    try {
-      long id = currentLedger.getId();
-      currentLedger.close();
-      bkc.deleteLedger(id);
-    } catch (BKException bke) {
-      //log & ignore, an IOException will be thrown soon
-      LOG.error("Error closing ledger", bke);
-    } catch (InterruptedException ie) {
-      Thread.currentThread().interrupt();
-      LOG.warn("Interrupted while closing ledger", ie);
-    }
-  }
-
-
-
-  /**
-   * Finalize a log segment. If the journal manager is currently
-   * writing to a ledger, ensure that this is the ledger of the log segment
-   * being finalized.
-   *
-   * Otherwise this is the recovery case. In the recovery case, ensure that
-   * the firstTxId of the ledger matches firstTxId for the segment we are
-   * trying to finalize.
-   */
-  @Override
-  public void finalizeLogSegment(long firstTxId, long lastTxId)
-      throws IOException {
-    checkEnv();
-
-    String inprogressPath = inprogressZNode(firstTxId);
-    try {
-      Stat inprogressStat = zkc.exists(inprogressPath, false);
-      if (inprogressStat == null) {
-        throw new IOException("Inprogress znode " + inprogressPath
-                              + " doesn't exist");
-      }
-
-      EditLogLedgerMetadata l
-        =  EditLogLedgerMetadata.read(zkc, inprogressPath);
-
-      if (currentLedger != null) { // normal, non-recovery case
-        if (l.getLedgerId() == currentLedger.getId()) {
-          try {
-            currentLedger.close();
-          } catch (BKException bke) {
-            LOG.error("Error closing current ledger", bke);
-          }
-          currentLedger = null;
-        } else {
-          throw new IOException(
-              "Active ledger has different ID to inprogress. "
-              + l.getLedgerId() + " found, "
-              + currentLedger.getId() + " expected");
-        }
-      }
-
-      if (l.getFirstTxId() != firstTxId) {
-        throw new IOException("Transaction id not as expected, "
-            + l.getFirstTxId() + " found, " + firstTxId + " expected");
-      }
-
-      l.finalizeLedger(lastTxId);
-      String finalisedPath = finalizedLedgerZNode(firstTxId, lastTxId);
-      try {
-        l.write(zkc, finalisedPath);
-      } catch (KeeperException.NodeExistsException nee) {
-        if (!l.verify(zkc, finalisedPath)) {
-          throw new IOException("Node " + finalisedPath + " already exists"
-                                + " but data doesn't match");
-        }
-      }
-      maxTxId.store(lastTxId);
-      zkc.delete(inprogressPath, inprogressStat.getVersion());
-      String inprogressPathFromCI = ci.read();
-      if (inprogressPath.equals(inprogressPathFromCI)) {
-        ci.clear();
-      }
-    } catch (KeeperException e) {
-      throw new IOException("Error finalising ledger", e);
-    } catch (InterruptedException ie) {
-      Thread.currentThread().interrupt();
-      throw new IOException("Error finalising ledger", ie);
-    }
-  }
-
-  public void selectInputStreams(
-      Collection<EditLogInputStream> streams,
-      long fromTxnId, boolean inProgressOk) throws IOException {
-    selectInputStreams(streams, fromTxnId, inProgressOk, false);
-  }
-
-  @Override
-  public void selectInputStreams(Collection<EditLogInputStream> streams,
-      long fromTxId, boolean inProgressOk, boolean onlyDurableTxns)
-      throws IOException {
-    List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(fromTxId,
-        inProgressOk);
-    try {
-      BookKeeperEditLogInputStream elis = null;
-      for (EditLogLedgerMetadata l : currentLedgerList) {
-        long lastTxId = l.getLastTxId();
-        if (l.isInProgress()) {
-          lastTxId = recoverLastTxId(l, false);
-        }
-        // Check once again, required in case of InProgress and is case of any
-        // gap.
-        if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) {
-          LedgerHandle h;
-          if (l.isInProgress()) { // we don't want to fence the current journal
-            h = bkc.openLedgerNoRecovery(l.getLedgerId(),
-                BookKeeper.DigestType.MAC, digestpw.getBytes(Charsets.UTF_8));
-          } else {
-            h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC,
-                digestpw.getBytes(Charsets.UTF_8));
-          }
-          elis = new BookKeeperEditLogInputStream(h, l);
-          elis.skipTo(fromTxId);
-        } else {
-          // If mismatches then there might be some gap, so we should not check
-          // further.
-          return;
-        }
-        streams.add(elis);
-        if (elis.getLastTxId() == HdfsServerConstants.INVALID_TXID) {
-          return;
-        }
-        fromTxId = elis.getLastTxId() + 1;
-      }
-    } catch (BKException e) {
-      throw new IOException("Could not open ledger for " + fromTxId, e);
-    } catch (InterruptedException ie) {
-      Thread.currentThread().interrupt();
-      throw new IOException("Interrupted opening ledger for " + fromTxId, ie);
-    }
-  }
-
-  long getNumberOfTransactions(long fromTxId, boolean inProgressOk)
-      throws IOException {
-    long count = 0;
-    long expectedStart = 0;
-    for (EditLogLedgerMetadata l : getLedgerList(inProgressOk)) {
-      long lastTxId = l.getLastTxId();
-      if (l.isInProgress()) {
-        lastTxId = recoverLastTxId(l, false);
-        if (lastTxId == HdfsServerConstants.INVALID_TXID) {
-          break;
-        }
-      }
-
-      assert lastTxId >= l.getFirstTxId();
-
-      if (lastTxId < fromTxId) {
-        continue;
-      } else if (l.getFirstTxId() <= fromTxId && lastTxId >= fromTxId) {
-        // we can start in the middle of a segment
-        count = (lastTxId - l.getFirstTxId()) + 1;
-        expectedStart = lastTxId + 1;
-      } else {
-        if (expectedStart != l.getFirstTxId()) {
-          if (count == 0) {
-            throw new CorruptionException("StartTxId " + l.getFirstTxId()
-                + " is not as expected " + expectedStart
-                + ". Gap in transaction log?");
-          } else {
-            break;
-          }
-        }
-        count += (lastTxId - l.getFirstTxId()) + 1;
-        expectedStart = lastTxId + 1;
-      }
-    }
-    return count;
-  }
-
-  @Override
-  public void recoverUnfinalizedSegments() throws IOException {
-    checkEnv();
-
-    synchronized (this) {
-      try {
-        List<String> children = zkc.getChildren(ledgerPath, false);
-        for (String child : children) {
-          if (!child.startsWith(BKJM_EDIT_INPROGRESS)) {
-            continue;
-          }
-          String znode = ledgerPath + "/" + child;
-          EditLogLedgerMetadata l = EditLogLedgerMetadata.read(zkc, znode);
-          try {
-            long endTxId = recoverLastTxId(l, true);
-            if (endTxId == HdfsServerConstants.INVALID_TXID) {
-              LOG.error("Unrecoverable corruption has occurred in segment "
-                  + l.toString() + " at path " + znode
-                  + ". Unable to continue recovery.");
-              throw new IOException("Unrecoverable corruption,"
-                  + " please check logs.");
-            }
-            finalizeLogSegment(l.getFirstTxId(), endTxId);
-          } catch (SegmentEmptyException see) {
-            LOG.warn("Inprogress znode " + child
-                + " refers to a ledger which is empty. This occurs when the NN"
-                + " crashes after opening a segment, but before writing the"
-                + " OP_START_LOG_SEGMENT op. It is safe to delete."
-                + " MetaData [" + l.toString() + "]");
-
-            // If the max seen transaction is the same as what would
-            // have been the first transaction of the failed ledger,
-            // decrement it, as that transaction never happened and as
-            // such, is _not_ the last seen
-            if (maxTxId.get() == l.getFirstTxId()) {
-              maxTxId.reset(maxTxId.get() - 1);
-            }
-
-            zkc.delete(znode, -1);
-          }
-        }
-      } catch (KeeperException.NoNodeException nne) {
-          // nothing to recover, ignore
-      } catch (KeeperException ke) {
-        throw new IOException("Couldn't get list of inprogress segments", ke);
-      } catch (InterruptedException ie) {
-        Thread.currentThread().interrupt();
-        throw new IOException("Interrupted getting list of inprogress segments",
-                              ie);
-      }
-    }
-  }
-
-  @Override
-  public void purgeLogsOlderThan(long minTxIdToKeep)
-      throws IOException {
-    checkEnv();
-
-    for (EditLogLedgerMetadata l : getLedgerList(false)) {
-      if (l.getLastTxId() < minTxIdToKeep) {
-        try {
-          Stat stat = zkc.exists(l.getZkPath(), false);
-          zkc.delete(l.getZkPath(), stat.getVersion());
-          bkc.deleteLedger(l.getLedgerId());
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt();
-          LOG.error("Interrupted while purging " + l, ie);
-        } catch (BKException bke) {
-          LOG.error("Couldn't delete ledger from bookkeeper", bke);
-        } catch (KeeperException ke) {
-          LOG.error("Error deleting ledger entry in zookeeper", ke);
-        }
-      }
-    }
-  }
-
-  @Override
-  public void doPreUpgrade() throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void doUpgrade(Storage storage) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getJournalCTime() throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void doFinalize() throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
-      int targetLayoutVersion) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void doRollback() throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void discardSegments(long startTxId) throws IOException {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void close() throws IOException {
-    try {
-      bkc.close();
-      zkc.close();
-    } catch (BKException bke) {
-      throw new IOException("Couldn't close bookkeeper client", bke);
-    } catch (InterruptedException ie) {
-      Thread.currentThread().interrupt();
-      throw new IOException("Interrupted while closing journal manager", ie);
-    }
-  }
-
-  /**
-   * Set the amount of memory that this stream should use to buffer edits.
-   * Setting this will only affect future output stream. Streams
-   * which have currently be created won't be affected.
-   */
-  @Override
-  public void setOutputBufferCapacity(int size) {
-    conf.getInt(BKJM_OUTPUT_BUFFER_SIZE, size);
-  }
-
-  /**
-   * Find the id of the last edit log transaction writen to a edit log
-   * ledger.
-   */
-  private long recoverLastTxId(EditLogLedgerMetadata l, boolean fence)
-      throws IOException, SegmentEmptyException {
-    LedgerHandle lh = null;
-    try {
-      if (fence) {
-        lh = bkc.openLedger(l.getLedgerId(),
-                            BookKeeper.DigestType.MAC,
-                            digestpw.getBytes(Charsets.UTF_8));
-      } else {
-        lh = bkc.openLedgerNoRecovery(l.getLedgerId(),
-                                      BookKeeper.DigestType.MAC,
-                                      digestpw.getBytes(Charsets.UTF_8));
-      }
-    } catch (BKException bke) {
-      throw new IOException("Exception opening ledger for " + l, bke);
-    } catch (InterruptedException ie) {
-      Thread.currentThread().interrupt();
-      throw new IOException("Interrupted opening ledger for " + l, ie);
-    }
-
-    BookKeeperEditLogInputStream in = null;
-
-    try {
-      long lastAddConfirmed = lh.getLastAddConfirmed();
-      if (lastAddConfirmed == -1) {
-        throw new SegmentEmptyException();
-      }
-
-      in = new BookKeeperEditLogInputStream(lh, l, lastAddConfirmed);
-
-      long endTxId = HdfsServerConstants.INVALID_TXID;
-      FSEditLogOp op = in.readOp();
-      while (op != null) {
-        if (endTxId == HdfsServerConstants.INVALID_TXID
-            || op.getTransactionId() == endTxId+1) {
-          endTxId = op.getTransactionId();
-        }
-        op = in.readOp();
-      }
-      return endTxId;
-    } finally {
-      if (in != null) {
-        in.close();
-      }
-    }
-  }
-
-  /**
-   * Get a list of all segments in the journal.
-   */
-  List<EditLogLedgerMetadata> getLedgerList(boolean inProgressOk)
-      throws IOException {
-    return getLedgerList(-1, inProgressOk);
-  }
-
-  private List<EditLogLedgerMetadata> getLedgerList(long fromTxId,
-      boolean inProgressOk) throws IOException {
-    List<EditLogLedgerMetadata> ledgers
-      = new ArrayList<EditLogLedgerMetadata>();
-    try {
-      List<String> ledgerNames = zkc.getChildren(ledgerPath, false);
-      for (String ledgerName : ledgerNames) {
-        if (!inProgressOk && ledgerName.contains(BKJM_EDIT_INPROGRESS)) {
-          continue;
-        }
-        String legderMetadataPath = ledgerPath + "/" + ledgerName;
-        try {
-          EditLogLedgerMetadata editLogLedgerMetadata = EditLogLedgerMetadata
-              .read(zkc, legderMetadataPath);
-          if (editLogLedgerMetadata.getLastTxId() != HdfsServerConstants.INVALID_TXID
-              && editLogLedgerMetadata.getLastTxId() < fromTxId) {
-            // exclude already read closed edits, but include inprogress edits
-            // as this will be handled in caller
-            continue;
-          }
-          ledgers.add(editLogLedgerMetadata);
-        } catch (KeeperException.NoNodeException e) {
-          LOG.warn("ZNode: " + legderMetadataPath
-              + " might have finalized and deleted."
-              + " So ignoring NoNodeException.");
-        }
-      }
-    } catch (KeeperException e) {
-      throw new IOException("Exception reading ledger list from zk", e);
-    } catch (InterruptedException ie) {
-      Thread.currentThread().interrupt();
-      throw new IOException("Interrupted getting list of ledgers from zk", ie);
-    }
-
-    Collections.sort(ledgers, EditLogLedgerMetadata.COMPARATOR);
-    return ledgers;
-  }
-
-  /**
-   * Get the znode path for a finalize ledger
-   */
-  String finalizedLedgerZNode(long startTxId, long endTxId) {
-    return String.format("%s/edits_%018d_%018d",
-                         ledgerPath, startTxId, endTxId);
-  }
-
-  /**
-   * Get the znode path for the inprogressZNode
-   */
-  String inprogressZNode(long startTxid) {
-    return ledgerPath + "/inprogress_" + Long.toString(startTxid, 16);
-  }
-
-  @VisibleForTesting
-  void setZooKeeper(ZooKeeper zk) {
-    this.zkc = zk;
-  }
-
-  /**
-   * Simple watcher to notify when zookeeper has connected
-   */
-  private class ZkConnectionWatcher implements Watcher {
-    public void process(WatchedEvent event) {
-      if (Event.KeeperState.SyncConnected.equals(event.getState())) {
-        zkConnectLatch.countDown();
-      }
-    }
-  }
-  
-  private static class SegmentEmptyException extends IOException {
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java
deleted file mode 100644
index 32d65cb..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/CurrentInprogress.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import java.io.IOException;
-import java.net.InetAddress;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-
-import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.CurrentInprogressProto;
-import com.google.protobuf.TextFormat;
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * Distributed write permission lock, using ZooKeeper. Read the version number
- * and return the current inprogress node path available in CurrentInprogress
- * path. If it exist, caller can treat that some other client already operating
- * on it. Then caller can take action. If there is no inprogress node exist,
- * then caller can treat that there is no client operating on it. Later same
- * caller should update the his newly created inprogress node path. At this
- * point, if some other activities done on this node, version number might
- * change, so update will fail. So, this read, update api will ensure that there
- * is only node can continue further after checking with CurrentInprogress.
- */
-
-class CurrentInprogress {
-  static final Log LOG = LogFactory.getLog(CurrentInprogress.class);
-
-  private final ZooKeeper zkc;
-  private final String currentInprogressNode;
-  private volatile int versionNumberForPermission = -1;
-  private final String hostName = InetAddress.getLocalHost().toString();
-
-  CurrentInprogress(ZooKeeper zkc, String lockpath) throws IOException {
-    this.currentInprogressNode = lockpath;
-    this.zkc = zkc;
-  }
-
-  void init() throws IOException {
-    try {
-      Stat isCurrentInprogressNodeExists = zkc.exists(currentInprogressNode,
-                                                      false);
-      if (isCurrentInprogressNodeExists == null) {
-        try {
-          zkc.create(currentInprogressNode, null, Ids.OPEN_ACL_UNSAFE,
-                     CreateMode.PERSISTENT);
-        } catch (NodeExistsException e) {
-          // Node might created by other process at the same time. Ignore it.
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(currentInprogressNode + " already created by other process.",
-                      e);
-          }
-        }
-      }
-    } catch (KeeperException e) {
-      throw new IOException("Exception accessing Zookeeper", e);
-    } catch (InterruptedException ie) {
-      throw new IOException("Interrupted accessing Zookeeper", ie);
-    }
-  }
-
-  /**
-   * Update the path with prepending version number and hostname
-   * 
-   * @param path
-   *          - to be updated in zookeeper
-   * @throws IOException
-   */
-  void update(String path) throws IOException {
-    CurrentInprogressProto.Builder builder = CurrentInprogressProto.newBuilder();
-    builder.setPath(path).setHostname(hostName);
-
-    String content = TextFormat.printToString(builder.build());
-
-    try {
-      zkc.setData(this.currentInprogressNode, content.getBytes(UTF_8),
-          this.versionNumberForPermission);
-    } catch (KeeperException e) {
-      throw new IOException("Exception when setting the data "
-          + "[" + content + "] to CurrentInprogress. ", e);
-    } catch (InterruptedException e) {
-      throw new IOException("Interrupted while setting the data "
-          + "[" + content + "] to CurrentInprogress", e);
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Updated data[" + content + "] to CurrentInprogress");
-    }
-  }
-
-  /**
-   * Read the CurrentInprogress node data from Zookeeper and also get the znode
-   * version number. Return the 3rd field from the data. i.e saved path with
-   * #update api
-   * 
-   * @return available inprogress node path. returns null if not available.
-   * @throws IOException
-   */
-  String read() throws IOException {
-    Stat stat = new Stat();
-    byte[] data = null;
-    try {
-      data = zkc.getData(this.currentInprogressNode, false, stat);
-    } catch (KeeperException e) {
-      throw new IOException("Exception while reading the data from "
-          + currentInprogressNode, e);
-    } catch (InterruptedException e) {
-      throw new IOException("Interrupted while reading data from "
-          + currentInprogressNode, e);
-    }
-    this.versionNumberForPermission = stat.getVersion();
-    if (data != null) {
-      CurrentInprogressProto.Builder builder = CurrentInprogressProto.newBuilder();
-      TextFormat.merge(new String(data, UTF_8), builder);
-      if (!builder.isInitialized()) {
-        throw new IOException("Invalid/Incomplete data in znode");
-      }
-      return builder.build().getPath();
-    } else {
-      LOG.debug("No data available in CurrentInprogress");
-    }
-    return null;
-  }
-
-  /** Clear the CurrentInprogress node data */
-  void clear() throws IOException {
-    try {
-      zkc.setData(this.currentInprogressNode, null, versionNumberForPermission);
-    } catch (KeeperException e) {
-      throw new IOException(
-          "Exception when setting the data to CurrentInprogress node", e);
-    } catch (InterruptedException e) {
-      throw new IOException(
-          "Interrupted when setting the data to CurrentInprogress node", e);
-    }
-    LOG.debug("Cleared the data from CurrentInprogress");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java
deleted file mode 100644
index 2d1f8b9..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/EditLogLedgerMetadata.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import java.io.IOException;
-import java.util.Comparator;
-
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.KeeperException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.EditLogLedgerProto;
-import com.google.protobuf.TextFormat;
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * Utility class for storing the metadata associated 
- * with a single edit log segment, stored in a single ledger
- */
-public class EditLogLedgerMetadata {
-  static final Log LOG = LogFactory.getLog(EditLogLedgerMetadata.class);
-
-  private String zkPath;
-  private final int dataLayoutVersion;
-  private final long ledgerId;
-  private final long firstTxId;
-  private long lastTxId;
-  private boolean inprogress;
-  
-  public static final Comparator COMPARATOR 
-    = new Comparator<EditLogLedgerMetadata>() {
-    public int compare(EditLogLedgerMetadata o1,
-        EditLogLedgerMetadata o2) {
-      if (o1.firstTxId < o2.firstTxId) {
-        return -1;
-      } else if (o1.firstTxId == o2.firstTxId) {
-        return 0;
-      } else {
-        return 1;
-      }
-    }
-  };
-
-  EditLogLedgerMetadata(String zkPath, int dataLayoutVersion,
-                        long ledgerId, long firstTxId) {
-    this.zkPath = zkPath;
-    this.dataLayoutVersion = dataLayoutVersion;
-    this.ledgerId = ledgerId;
-    this.firstTxId = firstTxId;
-    this.lastTxId = HdfsServerConstants.INVALID_TXID;
-    this.inprogress = true;
-  }
-  
-  EditLogLedgerMetadata(String zkPath, int dataLayoutVersion,
-                        long ledgerId, long firstTxId,
-                        long lastTxId) {
-    this.zkPath = zkPath;
-    this.dataLayoutVersion = dataLayoutVersion;
-    this.ledgerId = ledgerId;
-    this.firstTxId = firstTxId;
-    this.lastTxId = lastTxId;
-    this.inprogress = false;
-  }
-
-  String getZkPath() {
-    return zkPath;
-  }
-
-  long getFirstTxId() {
-    return firstTxId;
-  }
-  
-  long getLastTxId() {
-    return lastTxId;
-  }
-  
-  long getLedgerId() {
-    return ledgerId;
-  }
-  
-  boolean isInProgress() {
-    return this.inprogress;
-  }
-
-  int getDataLayoutVersion() {
-    return this.dataLayoutVersion;
-  }
-
-  void finalizeLedger(long newLastTxId) {
-    assert this.lastTxId == HdfsServerConstants.INVALID_TXID;
-    this.lastTxId = newLastTxId;
-    this.inprogress = false;      
-  }
-  
-  static EditLogLedgerMetadata read(ZooKeeper zkc, String path)
-      throws IOException, KeeperException.NoNodeException  {
-    try {
-      byte[] data = zkc.getData(path, false, null);
-
-      EditLogLedgerProto.Builder builder = EditLogLedgerProto.newBuilder();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Reading " + path + " data: " + new String(data, UTF_8));
-      }
-      TextFormat.merge(new String(data, UTF_8), builder);
-      if (!builder.isInitialized()) {
-        throw new IOException("Invalid/Incomplete data in znode");
-      }
-      EditLogLedgerProto ledger = builder.build();
-
-      int dataLayoutVersion = ledger.getDataLayoutVersion();
-      long ledgerId = ledger.getLedgerId();
-      long firstTxId = ledger.getFirstTxId();
-      if (ledger.hasLastTxId()) {
-        long lastTxId = ledger.getLastTxId();
-        return new EditLogLedgerMetadata(path, dataLayoutVersion,
-                                         ledgerId, firstTxId, lastTxId);
-      } else {
-        return new EditLogLedgerMetadata(path, dataLayoutVersion,
-                                         ledgerId, firstTxId);
-      }
-    } catch(KeeperException.NoNodeException nne) {
-      throw nne;
-    } catch(KeeperException ke) {
-      throw new IOException("Error reading from zookeeper", ke);
-    } catch (InterruptedException ie) {
-      throw new IOException("Interrupted reading from zookeeper", ie);
-    }
-  }
-    
-  void write(ZooKeeper zkc, String path)
-      throws IOException, KeeperException.NodeExistsException {
-    this.zkPath = path;
-
-    EditLogLedgerProto.Builder builder = EditLogLedgerProto.newBuilder();
-    builder.setDataLayoutVersion(dataLayoutVersion)
-      .setLedgerId(ledgerId).setFirstTxId(firstTxId);
-
-    if (!inprogress) {
-      builder.setLastTxId(lastTxId);
-    }
-    try {
-      zkc.create(path, TextFormat.printToString(builder.build()).getBytes(UTF_8),
-                 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-    } catch (KeeperException.NodeExistsException nee) {
-      throw nee;
-    } catch (KeeperException e) {
-      throw new IOException("Error creating ledger znode", e);
-    } catch (InterruptedException ie) {
-      throw new IOException("Interrupted creating ledger znode", ie);
-    }
-  }
-  
-  boolean verify(ZooKeeper zkc, String path) {
-    try {
-      EditLogLedgerMetadata other = read(zkc, path);
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Verifying " + this.toString() 
-                  + " against " + other);
-      }
-      return other.equals(this);
-    } catch (KeeperException e) {
-      LOG.error("Couldn't verify data in " + path, e);
-      return false;
-    } catch (IOException ie) {
-      LOG.error("Couldn't verify data in " + path, ie);
-      return false;
-    }
-  }
-  
-  public boolean equals(Object o) {
-    if (!(o instanceof EditLogLedgerMetadata)) {
-      return false;
-    }
-    EditLogLedgerMetadata ol = (EditLogLedgerMetadata)o;
-    return ledgerId == ol.ledgerId
-      && dataLayoutVersion == ol.dataLayoutVersion
-      && firstTxId == ol.firstTxId
-      && lastTxId == ol.lastTxId;
-  }
-
-  public int hashCode() {
-    int hash = 1;
-    hash = hash * 31 + (int) ledgerId;
-    hash = hash * 31 + (int) firstTxId;
-    hash = hash * 31 + (int) lastTxId;
-    hash = hash * 31 + dataLayoutVersion;
-    return hash;
-  }
-    
-  public String toString() {
-    return "[LedgerId:"+ledgerId +
-      ", firstTxId:" + firstTxId +
-      ", lastTxId:" + lastTxId +
-      ", dataLayoutVersion:" + dataLayoutVersion + "]";
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java
deleted file mode 100644
index 5a2eefa..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/MaxTxId.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.contrib.bkjournal;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-
-import org.apache.hadoop.contrib.bkjournal.BKJournalProtos.MaxTxIdProto;
-import com.google.protobuf.TextFormat;
-import static com.google.common.base.Charsets.UTF_8;
-
-/**
- * Utility class for storing and reading
- * the max seen txid in zookeeper
- */
-class MaxTxId {
-  static final Log LOG = LogFactory.getLog(MaxTxId.class);
-  
-  private final ZooKeeper zkc;
-  private final String path;
-
-  private Stat currentStat;
-
-  MaxTxId(ZooKeeper zkc, String path) {
-    this.zkc = zkc;
-    this.path = path;
-  }
-
-  synchronized void store(long maxTxId) throws IOException {
-    long currentMax = get();
-    if (currentMax < maxTxId) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Setting maxTxId to " + maxTxId);
-      }
-      reset(maxTxId);
-    }
-  }
-
-  synchronized void reset(long maxTxId) throws IOException {
-    try {
-      MaxTxIdProto.Builder builder = MaxTxIdProto.newBuilder().setTxId(maxTxId);
-
-      byte[] data = TextFormat.printToString(builder.build()).getBytes(UTF_8);
-      if (currentStat != null) {
-        currentStat = zkc.setData(path, data, currentStat
-            .getVersion());
-      } else {
-        zkc.create(path, data, Ids.OPEN_ACL_UNSAFE,
-                   CreateMode.PERSISTENT);
-      }
-    } catch (KeeperException e) {
-      throw new IOException("Error writing max tx id", e);
-    } catch (InterruptedException e) {
-      throw new IOException("Interrupted while writing max tx id", e);
-    }
-  }
-
-  synchronized long get() throws IOException {
-    try {
-      currentStat = zkc.exists(path, false);
-      if (currentStat == null) {
-        return 0;
-      } else {
-
-        byte[] bytes = zkc.getData(path, false, currentStat);
-
-        MaxTxIdProto.Builder builder = MaxTxIdProto.newBuilder();
-        TextFormat.merge(new String(bytes, UTF_8), builder);
-        if (!builder.isInitialized()) {
-          throw new IOException("Invalid/Incomplete data in znode");
-        }
-
-        return builder.build().getTxId();
-      }
-    } catch (KeeperException e) {
-      throw new IOException("Error reading the max tx id from zk", e);
-    } catch (InterruptedException ie) {
-      throw new IOException("Interrupted while reading thr max tx id", ie);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31195488/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/proto/bkjournal.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/proto/bkjournal.proto b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/proto/bkjournal.proto
deleted file mode 100644
index 15fa479..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/proto/bkjournal.proto
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// This file contains protocol buffers that are used by bkjournal
-// mostly for storing data in zookeeper
-
-option java_package = "org.apache.hadoop.contrib.bkjournal";
-option java_outer_classname = "BKJournalProtos";
-option java_generate_equals_and_hash = true;
-package hadoop.hdfs;
-
-import "hdfs.proto";
-import "HdfsServer.proto";
-
-message VersionProto {
-  required int32 layoutVersion = 1;
-  optional NamespaceInfoProto namespaceInfo = 2;
-}
-
-message EditLogLedgerProto {
-  required int32 dataLayoutVersion = 1;
-  required int64 ledgerId = 2;
-  required int64 firstTxId = 3;
-  optional int64 lastTxId = 4;
-}
-
-message MaxTxIdProto {
-  required int64 txId = 1;
-}
-
-message CurrentInprogressProto {
-  required string path = 1;
-  optional string hostname = 2;
-}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message