hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject hadoop git commit: HDFS-12149. Ozone: RocksDB implementation of ozone metadata store. Contributed by Weiwei Yang.
Date Fri, 21 Jul 2017 01:27:12 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 c2f2e0fa2 -> 0edc4c3dd


HDFS-12149. Ozone: RocksDB implementation of ozone metadata store. Contributed by Weiwei Yang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0edc4c3d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0edc4c3d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0edc4c3d

Branch: refs/heads/HDFS-7240
Commit: 0edc4c3dd8f6e6adcb57dfb81a891c92e4e183f9
Parents: c2f2e0f
Author: Anu Engineer <aengineer@apache.org>
Authored: Thu Jul 20 18:22:03 2017 -0700
Committer: Anu Engineer <aengineer@apache.org>
Committed: Thu Jul 20 18:22:03 2017 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/pom.xml         |   5 +
 .../org/apache/hadoop/utils/LevelDBStore.java   |   5 +-
 .../hadoop/utils/MetadataStoreBuilder.java      |  12 +-
 .../org/apache/hadoop/utils/RocksDBStore.java   | 318 +++++++++++++++++++
 .../src/main/resources/ozone-default.xml        |   2 +-
 .../apache/hadoop/ozone/TestMetadataStore.java  |  99 +++++-
 6 files changed, 433 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0edc4c3d/hadoop-hdfs-project/hadoop-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index 22e4aa8..61ccca9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -193,6 +193,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <artifactId>leveldbjni-all</artifactId>
       <version>1.8</version>
     </dependency>
+    <dependency>
+      <groupId>org.rocksdb</groupId>
+      <artifactId>rocksdbjni</artifactId>
+      <version>5.5.5</version>
+    </dependency>
     <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <dependency>
       <groupId>org.bouncycastle</groupId>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0edc4c3d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
index 415b788..c7df429 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
@@ -126,7 +126,9 @@ public class LevelDBStore implements MetadataStore {
    */
   @Override
   public void close() throws IOException {
-    db.close();
+    if (db != null){
+      db.close();
+    }
   }
 
   /**
@@ -163,6 +165,7 @@ public class LevelDBStore implements MetadataStore {
 
   @Override
   public void destroy() throws IOException {
+    close();
     JniDBFactory.factory.destroy(dbFile, dbOptions);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0edc4c3d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java
index 81f2d8a..5546549 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.utils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.iq80.leveldb.Options;
+import org.rocksdb.BlockBasedTableConfig;
 
 import java.io.File;
 import java.io.IOException;
@@ -82,8 +83,15 @@ public class MetadataStoreBuilder {
       }
       store = new LevelDBStore(dbFile, options);
     } else if (OZONE_METADATA_STORE_IMPL_ROCKSDB.equals(impl)) {
-      // TODO replace with rocksDB impl
-      store = new LevelDBStore(dbFile, new Options());
+      org.rocksdb.Options opts = new org.rocksdb.Options();
+      opts.setCreateIfMissing(createIfMissing);
+
+      if (cacheSize > 0) {
+        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
+        tableConfig.setBlockCacheSize(cacheSize);
+        opts.setTableFormatConfig(tableConfig);
+      }
+      store = new RocksDBStore(dbFile, opts);
     } else {
       throw new IllegalArgumentException("Invalid argument for "
           + OzoneConfigKeys.OZONE_METADATA_STORE_IMPL

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0edc4c3d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
new file mode 100644
index 0000000..b2e5e2a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Options;
+import org.rocksdb.WriteOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.DbPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.AbstractMap;
+
+/**
+ * RocksDB implementation of ozone metadata store.
+ */
+public class RocksDBStore implements MetadataStore {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RocksDBStore.class);
+
+  private RocksDB db = null;
+  private File dbLocation;
+  private WriteOptions writeOptions;
+  private Options dbOptions;
+
+  public RocksDBStore(File dbFile, Options options) throws IOException {
+    Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
+    RocksDB.loadLibrary();
+    dbOptions = options;
+    dbLocation = dbFile;
+    writeOptions = new WriteOptions();
+    writeOptions.setSync(true);
+    writeOptions.setNoSlowdown(true);
+    try {
+      db = RocksDB.open(dbOptions, dbLocation.getAbsolutePath());
+    } catch (RocksDBException e) {
+      throw new IOException("Failed init RocksDB, db path : "
+          + dbFile.getAbsolutePath(), e);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("RocksDB successfully opened.");
+      LOG.debug("[Option] dbLocation= {}", dbLocation.getAbsolutePath());
+      LOG.debug("[Option] createIfMissing = {}", options.createIfMissing());
+      LOG.debug("[Option] compactionPriority= {}", options.compactionStyle());
+      LOG.debug("[Option] compressionType= {}", options.compressionType());
+      LOG.debug("[Option] maxOpenFiles= {}", options.maxOpenFiles());
+      LOG.debug("[Option] writeBufferSize= {}", options.writeBufferSize());
+    }
+  }
+
+  private IOException toIOException(String msg, RocksDBException e) {
+    String statusCode = e.getStatus() == null ? "N/A" :
+        e.getStatus().getCodeString();
+    String errMessage = e.getMessage() == null ? "Unknown error" :
+        e.getMessage();
+    String output = msg + "; status : " + statusCode
+        + "; message : " + errMessage;
+    return new IOException(output, e);
+  }
+
+  @Override
+  public void put(byte[] key, byte[] value) throws IOException {
+    try {
+      db.put(writeOptions, key, value);
+    } catch (RocksDBException e) {
+      throw toIOException("Failed to put key-value to metadata store", e);
+    }
+  }
+
+  @Override
+  public boolean isEmpty() throws IOException {
+    RocksIterator it = null;
+    try {
+      it = db.newIterator();
+      it.seekToFirst();
+      return !it.isValid();
+    } finally {
+      if (it != null) {
+        it.close();
+      }
+    }
+  }
+
+  @Override
+  public byte[] get(byte[] key) throws IOException {
+    try {
+      return db.get(key);
+    } catch (RocksDBException e) {
+      throw toIOException("Failed to get the value for the given key", e);
+    }
+  }
+
+  @Override
+  public void delete(byte[] key) throws IOException {
+    try {
+      db.delete(key);
+    } catch (RocksDBException e) {
+      throw toIOException("Failed to delete the given key", e);
+    }
+  }
+
+  @Override
+  public List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
+      int count, MetadataKeyFilters.MetadataKeyFilter... filters)
+      throws IOException, IllegalArgumentException {
+    List<Map.Entry<byte[], byte[]>> result = new ArrayList<>();
+    long start = System.currentTimeMillis();
+    if (count < 0) {
+      throw new IllegalArgumentException(
+          "Invalid count given " + count + ", count must be greater than 0");
+    }
+    RocksIterator it = null;
+    try {
+      it = db.newIterator();
+      if (startKey == null) {
+        it.seekToFirst();
+      } else {
+        if(get(startKey) == null) {
+          throw new IOException("Invalid start key, not found in current db");
+        }
+        it.seek(startKey);
+      }
+      while(it.isValid() && result.size() < count) {
+        byte[] currentKey = it.key();
+        byte[] currentValue = it.value();
+
+        it.prev();
+        final byte[] prevKey = it.isValid() ? it.key() : null;
+
+        it.seek(currentKey);
+        it.next();
+        final byte[] nextKey = it.isValid() ? it.key() : null;
+
+        if (filters == null || Arrays.asList(filters).stream()
+            .allMatch(entry -> entry.filterKey(prevKey,
+                currentKey, nextKey))) {
+          result.add(new AbstractMap.SimpleImmutableEntry<>(currentKey,
+              currentValue));
+        }
+      }
+    } finally {
+      if (it != null) {
+        it.close();
+      }
+      long end = System.currentTimeMillis();
+      long timeConsumed = end - start;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Time consumed for getRangeKVs() is {}ms,"
+            + " result length is {}.", timeConsumed, result.size());
+      }
+    }
+
+    return result;
+  }
+
+  @Override
+  public void writeBatch(BatchOperation operation)
+      throws IOException {
+    List<BatchOperation.SingleOperation> operations =
+        operation.getOperations();
+    if (!operations.isEmpty()) {
+      try (WriteBatch writeBatch = new WriteBatch()) {
+        for (BatchOperation.SingleOperation opt : operations) {
+          switch (opt.getOpt()) {
+          case DELETE:
+            writeBatch.remove(opt.getKey());
+            break;
+          case PUT:
+            writeBatch.put(opt.getKey(), opt.getValue());
+            break;
+          default:
+            throw new IllegalArgumentException("Invalid operation "
+                + opt.getOpt());
+          }
+        }
+        db.write(writeOptions, writeBatch);
+      } catch (RocksDBException e) {
+        throw toIOException("Batch write operation failed", e);
+      }
+    }
+  }
+
+  @Override
+  public void compactDB() throws IOException {
+    if (db != null) {
+      try {
+        db.compactRange();
+      } catch (RocksDBException e) {
+        throw toIOException("Failed to compact db", e);
+      }
+    }
+  }
+
+  private void deleteQuietly(File fileOrDir) {
+    if (fileOrDir != null && fileOrDir.exists()) {
+      try {
+        FileUtils.forceDelete(fileOrDir);
+      } catch (IOException e) {
+        LOG.warn("Failed to delete dir {}", fileOrDir.getAbsolutePath(), e);
+      }
+    }
+  }
+
+  @Override
+  public void destroy() throws IOException {
+    // Make sure db is closed.
+    close();
+
+    // There is no destroydb java API available,
+    // equivalently we can delete all db directories.
+    deleteQuietly(dbLocation);
+    deleteQuietly(new File(dbOptions.dbLogDir()));
+    deleteQuietly(new File(dbOptions.walDir()));
+    List<DbPath> dbPaths = dbOptions.dbPaths();
+    if (dbPaths != null) {
+      dbPaths.forEach(dbPath -> {
+        deleteQuietly(new File(dbPath.toString()));
+      });
+    }
+  }
+
+  @Override
+  public ImmutablePair<byte[], byte[]> peekAround(int offset,
+      byte[] from) throws IOException, IllegalArgumentException {
+    RocksIterator it = null;
+    try {
+      it = db.newIterator();
+      if (from == null) {
+        it.seekToFirst();
+      } else {
+        it.seek(from);
+      }
+      if (!it.isValid()) {
+        throw new IOException("Key not found");
+      }
+
+      switch (offset) {
+      case 0:
+        break;
+      case 1:
+        it.next();
+        break;
+      case -1:
+        it.prev();
+        break;
+      default:
+        throw new IllegalArgumentException(
+            "Position can only be -1, 0 " + "or 1, but found " + offset);
+      }
+      return it.isValid() ? new ImmutablePair<>(it.key(), it.value()) : null;
+    } finally {
+      if (it != null) {
+        it.close();
+      }
+    }
+  }
+
+  @Override
+  public void iterate(byte[] from, EntryConsumer consumer)
+      throws IOException {
+    RocksIterator it = null;
+    try {
+      it = db.newIterator();
+      if (from != null) {
+        it.seek(from);
+      } else {
+        it.seekToFirst();
+      }
+      while (it.isValid()) {
+        if (!consumer.consume(it.key(), it.value())) {
+          break;
+        }
+        it.next();
+      }
+    } finally {
+      if (it != null) {
+        it.close();
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (db != null) {
+      db.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0edc4c3d/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
index bd29ce4..133bcb2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
@@ -598,7 +598,7 @@
 
   <property>
     <name>ozone.metastore.impl</name>
-    <value>LevelDB</value>
+    <value>RocksDB</value>
     <description>
       Ozone metadata store implementation. Ozone metadata are well distributed
       to multiple services such as ksm, scm. They are stored in some local

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0edc4c3d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java
index 0000e50..143ea94 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java
@@ -22,6 +22,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.utils.BatchOperation;
 import org.apache.hadoop.utils.MetadataStore;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
@@ -33,6 +34,8 @@ import org.junit.After;
 import org.junit.Test;
 import org.junit.Assert;
 import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.File;
 import java.io.IOException;
@@ -40,15 +43,35 @@ import java.util.List;
 import java.util.Map;
 import java.util.ArrayList;
 import java.util.UUID;
+import java.util.Collection;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.runners.Parameterized.*;
 
 /**
  * Test class for ozone metadata store.
  */
+@RunWith(Parameterized.class)
 public class TestMetadataStore {
 
+  private final String storeImpl;
+
+  public TestMetadataStore(String metadataImpl) {
+    this.storeImpl = metadataImpl;
+  }
+
+  @Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] {
+        {OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB},
+        {OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB}
+    });
+  }
+
   private MetadataStore store;
   private File testDir;
-
   private final static int MAX_GETRANGE_LENGTH = 100;
 
   @Rule
@@ -56,11 +79,11 @@ public class TestMetadataStore {
 
   @Before
   public void init() throws IOException {
-    testDir = GenericTestUtils.getTestDir(getClass().getSimpleName());
+    testDir = GenericTestUtils.getTestDir(getClass().getSimpleName()
+        + "-" + storeImpl.toLowerCase());
 
     Configuration conf = new OzoneConfiguration();
-    conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL,
-        OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB);
+    conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, storeImpl);
 
     store = MetadataStoreBuilder.newBuilder()
         .setConf(conf)
@@ -293,4 +316,72 @@ public class TestMetadataStore {
     expectedException.expectMessage("Invalid start key");
     store.getRangeKVs(getBytes("unknownKey"), MAX_GETRANGE_LENGTH);
   }
+
+  @Test
+  public void testDestroyDB() throws IOException {
+    // create a new DB to test db destroy
+    Configuration conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, storeImpl);
+
+    File dbDir = GenericTestUtils.getTestDir(getClass().getSimpleName()
+        + "-" + storeImpl.toLowerCase() + "-toDestroy");
+    MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
+        .setConf(conf)
+        .setCreateIfMissing(true)
+        .setDbFile(dbDir)
+        .build();
+
+    dbStore.put(getBytes("key1"), getBytes("value1"));
+    dbStore.put(getBytes("key2"), getBytes("value2"));
+
+    Assert.assertFalse(dbStore.isEmpty());
+    Assert.assertTrue(dbDir.exists());
+    Assert.assertTrue(dbDir.listFiles().length > 0);
+
+    dbStore.destroy();
+
+    Assert.assertFalse(dbDir.exists());
+  }
+
+  @Test
+  public void testBatchWrite() throws IOException {
+    Configuration conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, storeImpl);
+
+    File dbDir = GenericTestUtils.getTestDir(getClass().getSimpleName()
+        + "-" + storeImpl.toLowerCase() + "-batchWrite");
+    MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
+        .setConf(conf)
+        .setCreateIfMissing(true)
+        .setDbFile(dbDir)
+        .build();
+
+    List<String> expectedResult = Lists.newArrayList();
+    for (int i = 0; i<10; i++) {
+      dbStore.put(getBytes("batch-" + i), getBytes("batch-value-" + i));
+      expectedResult.add("batch-" + i);
+    }
+
+    BatchOperation batch = new BatchOperation();
+    batch.delete(getBytes("batch-2"));
+    batch.delete(getBytes("batch-3"));
+    batch.delete(getBytes("batch-4"));
+    batch.put(getBytes("batch-new-2"), getBytes("batch-new-value-2"));
+
+    expectedResult.remove("batch-2");
+    expectedResult.remove("batch-3");
+    expectedResult.remove("batch-4");
+    expectedResult.add("batch-new-2");
+
+    dbStore.writeBatch(batch);
+
+    Iterator<String> it = expectedResult.iterator();
+    AtomicInteger count = new AtomicInteger(0);
+    dbStore.iterate(null, (key, value) -> {
+      count.incrementAndGet();
+      return it.hasNext() && it.next().equals(getString(key));
+    });
+
+    Assert.assertEquals(8, count.get());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message