hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From viraj...@apache.org
Subject [30/50] [abbrv] hadoop git commit: HDDS-167. Rename KeySpaceManager to OzoneManager. Contributed by Arpit Agarwal.
Date Mon, 09 Jul 2018 18:26:22 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
new file mode 100644
index 0000000..2d04452
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+
+/**
+ * This class is for maintaining Ozone Manager statistics.
+ */
+@InterfaceAudience.Private
+@Metrics(about="Ozone Manager Metrics", context="dfs")
+public class OMMetrics {
+  private static final String SOURCE_NAME =
+      OMMetrics.class.getSimpleName();
+
+  // OM request type op metrics
+  private @Metric MutableCounterLong numVolumeOps;
+  private @Metric MutableCounterLong numBucketOps;
+  private @Metric MutableCounterLong numKeyOps;
+
+  // OM op metrics
+  private @Metric MutableCounterLong numVolumeCreates;
+  private @Metric MutableCounterLong numVolumeUpdates;
+  private @Metric MutableCounterLong numVolumeInfos;
+  private @Metric MutableCounterLong numVolumeCheckAccesses;
+  private @Metric MutableCounterLong numBucketCreates;
+  private @Metric MutableCounterLong numVolumeDeletes;
+  private @Metric MutableCounterLong numBucketInfos;
+  private @Metric MutableCounterLong numBucketUpdates;
+  private @Metric MutableCounterLong numBucketDeletes;
+  private @Metric MutableCounterLong numKeyAllocate;
+  private @Metric MutableCounterLong numKeyLookup;
+  private @Metric MutableCounterLong numKeyRenames;
+  private @Metric MutableCounterLong numKeyDeletes;
+  private @Metric MutableCounterLong numBucketLists;
+  private @Metric MutableCounterLong numKeyLists;
+  private @Metric MutableCounterLong numVolumeLists;
+  private @Metric MutableCounterLong numKeyCommits;
+  private @Metric MutableCounterLong numAllocateBlockCalls;
+  private @Metric MutableCounterLong numGetServiceLists;
+
+  // Failure Metrics
+  private @Metric MutableCounterLong numVolumeCreateFails;
+  private @Metric MutableCounterLong numVolumeUpdateFails;
+  private @Metric MutableCounterLong numVolumeInfoFails;
+  private @Metric MutableCounterLong numVolumeDeleteFails;
+  private @Metric MutableCounterLong numBucketCreateFails;
+  private @Metric MutableCounterLong numVolumeCheckAccessFails;
+  private @Metric MutableCounterLong numBucketInfoFails;
+  private @Metric MutableCounterLong numBucketUpdateFails;
+  private @Metric MutableCounterLong numBucketDeleteFails;
+  private @Metric MutableCounterLong numKeyAllocateFails;
+  private @Metric MutableCounterLong numKeyLookupFails;
+  private @Metric MutableCounterLong numKeyRenameFails;
+  private @Metric MutableCounterLong numKeyDeleteFails;
+  private @Metric MutableCounterLong numBucketListFails;
+  private @Metric MutableCounterLong numKeyListFails;
+  private @Metric MutableCounterLong numVolumeListFails;
+  private @Metric MutableCounterLong numKeyCommitFails;
+  private @Metric MutableCounterLong numBlockAllocateCallFails;
+  private @Metric MutableCounterLong numGetServiceListFails;
+
+  public OMMetrics() {
+  }
+
+  public static OMMetrics create() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(SOURCE_NAME,
+        "Oozne Manager Metrics",
+        new OMMetrics());
+  }
+
+  public void incNumVolumeCreates() {
+    numVolumeOps.incr();
+    numVolumeCreates.incr();
+  }
+
+  public void incNumVolumeUpdates() {
+    numVolumeOps.incr();
+    numVolumeUpdates.incr();
+  }
+
+  public void incNumVolumeInfos() {
+    numVolumeOps.incr();
+    numVolumeInfos.incr();
+  }
+
+  public void incNumVolumeDeletes() {
+    numVolumeOps.incr();
+    numVolumeDeletes.incr();
+  }
+
+  public void incNumVolumeCheckAccesses() {
+    numVolumeOps.incr();
+    numVolumeCheckAccesses.incr();
+  }
+
+  public void incNumBucketCreates() {
+    numBucketOps.incr();
+    numBucketCreates.incr();
+  }
+
+  public void incNumBucketInfos() {
+    numBucketOps.incr();
+    numBucketInfos.incr();
+  }
+
+  public void incNumBucketUpdates() {
+    numBucketOps.incr();
+    numBucketUpdates.incr();
+  }
+
+  public void incNumBucketDeletes() {
+    numBucketOps.incr();
+    numBucketDeletes.incr();
+  }
+
+  public void incNumBucketLists() {
+    numBucketOps.incr();
+    numBucketLists.incr();
+  }
+
+  public void incNumKeyLists() {
+    numKeyOps.incr();
+    numKeyLists.incr();
+  }
+
+  public void incNumVolumeLists() {
+    numVolumeOps.incr();
+    numVolumeLists.incr();
+  }
+
+  public void incNumGetServiceLists() {
+    numGetServiceLists.incr();
+  }
+
+  public void incNumVolumeCreateFails() {
+    numVolumeCreateFails.incr();
+  }
+
+  public void incNumVolumeUpdateFails() {
+    numVolumeUpdateFails.incr();
+  }
+
+  public void incNumVolumeInfoFails() {
+    numVolumeInfoFails.incr();
+  }
+
+  public void incNumVolumeDeleteFails() {
+    numVolumeDeleteFails.incr();
+  }
+
+  public void incNumVolumeCheckAccessFails() {
+    numVolumeCheckAccessFails.incr();
+  }
+
+  public void incNumBucketCreateFails() {
+    numBucketCreateFails.incr();
+  }
+
+  public void incNumBucketInfoFails() {
+    numBucketInfoFails.incr();
+  }
+
+  public void incNumBucketUpdateFails() {
+    numBucketUpdateFails.incr();
+  }
+
+  public void incNumBucketDeleteFails() {
+    numBucketDeleteFails.incr();
+  }
+
+  public void incNumKeyAllocates() {
+    numKeyOps.incr();
+    numKeyAllocate.incr();
+  }
+
+  public void incNumKeyAllocateFails() {
+    numKeyAllocateFails.incr();
+  }
+
+  public void incNumKeyLookups() {
+    numKeyOps.incr();
+    numKeyLookup.incr();
+  }
+
+  public void incNumKeyLookupFails() {
+    numKeyLookupFails.incr();
+  }
+
+  public void incNumKeyRenames() {
+    numKeyOps.incr();
+    numKeyRenames.incr();
+  }
+
+  public void incNumKeyRenameFails() {
+    numKeyOps.incr();
+    numKeyRenameFails.incr();
+  }
+
+  public void incNumKeyDeleteFails() {
+    numKeyDeleteFails.incr();
+  }
+
+  public void incNumKeyDeletes() {
+    numKeyOps.incr();
+    numKeyDeletes.incr();
+  }
+
+  public void incNumKeyCommits() {
+    numKeyOps.incr();
+    numKeyCommits.incr();
+  }
+
+  public void incNumKeyCommitFails() {
+    numKeyCommitFails.incr();
+  }
+
+  public void incNumBlockAllocateCalls() {
+    numAllocateBlockCalls.incr();
+  }
+
+  public void incNumBlockAllocateCallFails() {
+    numBlockAllocateCallFails.incr();
+  }
+
+  public void incNumBucketListFails() {
+    numBucketListFails.incr();
+  }
+
+  public void incNumKeyListFails() {
+    numKeyListFails.incr();
+  }
+
+  public void incNumVolumeListFails() {
+    numVolumeListFails.incr();
+  }
+
+  public void incNumGetServiceListFails() {
+    numGetServiceListFails.incr();
+  }
+
+  @VisibleForTesting
+  public long getNumVolumeCreates() {
+    return numVolumeCreates.value();
+  }
+
+  @VisibleForTesting
+  public long getNumVolumeUpdates() {
+    return numVolumeUpdates.value();
+  }
+
+  @VisibleForTesting
+  public long getNumVolumeInfos() {
+    return numVolumeInfos.value();
+  }
+
+  @VisibleForTesting
+  public long getNumVolumeDeletes() {
+    return numVolumeDeletes.value();
+  }
+
+  @VisibleForTesting
+  public long getNumVolumeCheckAccesses() {
+    return numVolumeCheckAccesses.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBucketCreates() {
+    return numBucketCreates.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBucketInfos() {
+    return numBucketInfos.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBucketUpdates() {
+    return numBucketUpdates.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBucketDeletes() {
+    return numBucketDeletes.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBucketLists() {
+    return numBucketLists.value();
+  }
+
+  @VisibleForTesting
+  public long getNumVolumeLists() {
+    return numVolumeLists.value();
+  }
+
+  @VisibleForTesting
+  public long getNumKeyLists() {
+    return numKeyLists.value();
+  }
+
+  @VisibleForTesting
+  public long getNumGetServiceLists() {
+    return numGetServiceLists.value();
+  }
+
+  @VisibleForTesting
+  public long getNumVolumeCreateFails() {
+    return numVolumeCreateFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumVolumeUpdateFails() {
+    return numVolumeUpdateFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumVolumeInfoFails() {
+    return numVolumeInfoFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumVolumeDeleteFails() {
+    return numVolumeDeleteFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumVolumeCheckAccessFails() {
+    return numVolumeCheckAccessFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBucketCreateFails() {
+    return numBucketCreateFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBucketInfoFails() {
+    return numBucketInfoFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBucketUpdateFails() {
+    return numBucketUpdateFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBucketDeleteFails() {
+    return numBucketDeleteFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumKeyAllocates() {
+    return numKeyAllocate.value();
+  }
+
+  @VisibleForTesting
+  public long getNumKeyAllocateFails() {
+    return numKeyAllocateFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumKeyLookups() {
+    return numKeyLookup.value();
+  }
+
+  @VisibleForTesting
+  public long getNumKeyLookupFails() {
+    return numKeyLookupFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumKeyRenames() {
+    return numKeyRenames.value();
+  }
+
+  @VisibleForTesting
+  public long getNumKeyRenameFails() {
+    return numKeyRenameFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumKeyDeletes() {
+    return numKeyDeletes.value();
+  }
+
+  @VisibleForTesting
+  public long getNumKeyDeletesFails() {
+    return numKeyDeleteFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBucketListFails() {
+    return numBucketListFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumKeyListFails() {
+    return numKeyListFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumVolumeListFails() {
+    return numVolumeListFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumKeyCommits() {
+    return numKeyCommits.value();
+  }
+
+  @VisibleForTesting
+  public long getNumKeyCommitFails() {
+    return numKeyCommitFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBlockAllocates() {
+    return numAllocateBlockCalls.value();
+  }
+
+  @VisibleForTesting
+  public long getNumBlockAllocateFails() {
+    return numBlockAllocateCallFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumGetServiceListFails() {
+    return numGetServiceListFails.value();
+  }
+
+  public void unRegister() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.unregisterSource(SOURCE_NAME);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStorage.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStorage.java
new file mode 100644
index 0000000..3820aed
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStorage.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.common.Storage;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
+
+import static org.apache.hadoop.ozone.OzoneConsts.SCM_ID;
+import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
+
+/**
+ * OMStorage is responsible for management of the StorageDirectories used by
+ * the Ozone Manager.
+ */
+public class OMStorage extends Storage {
+
+  public static final String STORAGE_DIR = "om";
+  public static final String OM_ID = "omUuid";
+
+  /**
+   * Construct OMStorage.
+   * @throws IOException if any directories are inaccessible.
+   */
+  public OMStorage(OzoneConfiguration conf) throws IOException {
+    super(NodeType.OM, getOzoneMetaDirPath(conf), STORAGE_DIR);
+  }
+
+  public void setScmId(String scmId) throws IOException {
+    if (getState() == StorageState.INITIALIZED) {
+      throw new IOException("OM is already initialized.");
+    } else {
+      getStorageInfo().setProperty(SCM_ID, scmId);
+    }
+  }
+
+  public void setOmId(String omId) throws IOException {
+    if (getState() == StorageState.INITIALIZED) {
+      throw new IOException("OM is already initialized.");
+    } else {
+      getStorageInfo().setProperty(OM_ID, omId);
+    }
+  }
+
+  /**
+   * Retrieves the SCM ID from the version file.
+   * @return SCM_ID
+   */
+  public String getScmId() {
+    return getStorageInfo().getProperty(SCM_ID);
+  }
+
+  /**
+   * Retrieves the OM ID from the version file.
+   * @return OM_ID
+   */
+  public String getOmId() {
+    return getStorageInfo().getProperty(OM_ID);
+  }
+
+  @Override
+  protected Properties getNodeProperties() {
+    String omId = getOmId();
+    if (omId == null) {
+      omId = UUID.randomUUID().toString();
+    }
+    Properties omProperties = new Properties();
+    omProperties.setProperty(OM_ID, omId);
+    return omProperties;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
new file mode 100644
index 0000000..21d2411
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -0,0 +1,526 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
+
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataKeyFilters;
+import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
+import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_ID_DELIMINATOR;
+import static org.apache.hadoop.ozone.OzoneConsts.OPEN_KEY_PREFIX;
+import static org.apache.hadoop.ozone.om.OMConfigKeys
+    .OZONE_OM_DB_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys
+    .OZONE_OM_DB_CACHE_SIZE_MB;
+import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
+
+/**
+ * Ozone metadata manager interface.
+ */
+public class OmMetadataManagerImpl implements OMMetadataManager {
+
+  private final MetadataStore store;
+  private final ReadWriteLock lock;
+  private final long openKeyExpireThresholdMS;
+
+  public OmMetadataManagerImpl(OzoneConfiguration conf) throws IOException {
+    File metaDir = getOzoneMetaDirPath(conf);
+    final int cacheSize = conf.getInt(OZONE_OM_DB_CACHE_SIZE_MB,
+        OZONE_OM_DB_CACHE_SIZE_DEFAULT);
+    File omDBFile = new File(metaDir.getPath(), OM_DB_NAME);
+    this.store = MetadataStoreBuilder.newBuilder()
+        .setConf(conf)
+        .setDbFile(omDBFile)
+        .setCacheSize(cacheSize * OzoneConsts.MB)
+        .build();
+    this.lock = new ReentrantReadWriteLock();
+    this.openKeyExpireThresholdMS = 1000 * conf.getInt(
+        OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS,
+        OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT);
+  }
+
+  /**
+   * Start metadata manager.
+   */
+  @Override
+  public void start() {
+
+  }
+
+  /**
+   * Stop metadata manager.
+   */
+  @Override
+  public void stop() throws IOException {
+    if (store != null) {
+      store.close();
+    }
+  }
+
+  /**
+   * Get metadata store.
+   * @return store - metadata store.
+   */
+  @VisibleForTesting
+  @Override
+  public MetadataStore getStore() {
+    return store;
+  }
+
+  /**
+   * Given a volume return the corresponding DB key.
+   * @param volume - Volume name
+   */
+  public byte[] getVolumeKey(String volume) {
+    String dbVolumeName = OzoneConsts.OM_VOLUME_PREFIX + volume;
+    return DFSUtil.string2Bytes(dbVolumeName);
+  }
+
+  /**
+   * Given a user return the corresponding DB key.
+   * @param user - User name
+   */
+  public byte[] getUserKey(String user) {
+    String dbUserName = OzoneConsts.OM_USER_PREFIX + user;
+    return DFSUtil.string2Bytes(dbUserName);
+  }
+
+  /**
+   * Given a volume and bucket, return the corresponding DB key.
+   * @param volume - User name
+   * @param bucket - Bucket name
+   */
+  public byte[] getBucketKey(String volume, String bucket) {
+    String bucketKeyString = OzoneConsts.OM_VOLUME_PREFIX + volume
+        + OzoneConsts.OM_BUCKET_PREFIX + bucket;
+    return DFSUtil.string2Bytes(bucketKeyString);
+  }
+
+  /**
+   * @param volume
+   * @param bucket
+   * @return
+   */
+  private String getBucketWithDBPrefix(String volume, String bucket) {
+    StringBuffer sb = new StringBuffer();
+    sb.append(OzoneConsts.OM_VOLUME_PREFIX)
+        .append(volume)
+        .append(OzoneConsts.OM_BUCKET_PREFIX);
+    if (!Strings.isNullOrEmpty(bucket)) {
+      sb.append(bucket);
+    }
+    return sb.toString();
+  }
+
+  @Override
+  public String getKeyWithDBPrefix(String volume, String bucket, String key) {
+    String keyVB = OzoneConsts.OM_KEY_PREFIX + volume
+        + OzoneConsts.OM_KEY_PREFIX + bucket
+        + OzoneConsts.OM_KEY_PREFIX;
+    return Strings.isNullOrEmpty(key) ? keyVB : keyVB + key;
+  }
+
+  @Override
+  public byte[] getDBKeyBytes(String volume, String bucket, String key) {
+    return DFSUtil.string2Bytes(getKeyWithDBPrefix(volume, bucket, key));
+  }
+
+  @Override
+  public byte[] getDeletedKeyName(byte[] keyName) {
+    return DFSUtil.string2Bytes(
+        DELETING_KEY_PREFIX + DFSUtil.bytes2String(keyName));
+  }
+
+  @Override
+  public byte[] getOpenKeyNameBytes(String keyName, int id) {
+    return DFSUtil.string2Bytes(OPEN_KEY_PREFIX + id +
+        OPEN_KEY_ID_DELIMINATOR + keyName);
+  }
+
+  /**
+   * Returns the read lock used on Metadata DB.
+   * @return readLock
+   */
+  @Override
+  public Lock readLock() {
+    return lock.readLock();
+  }
+
+  /**
+   * Returns the write lock used on Metadata DB.
+   * @return writeLock
+   */
+  @Override
+  public Lock writeLock() {
+    return lock.writeLock();
+  }
+
+  /**
+   * Returns the value associated with this key.
+   * @param key - key
+   * @return value
+   */
+  @Override
+  public byte[] get(byte[] key) throws IOException {
+    return store.get(key);
+  }
+
+  /**
+   * Puts a Key into Metadata DB.
+   * @param key   - key
+   * @param value - value
+   */
+  @Override
+  public void put(byte[] key, byte[] value) throws IOException {
+    store.put(key, value);
+  }
+
+  /**
+   * Deletes a Key from Metadata DB.
+   * @param key   - key
+   */
+  public void delete(byte[] key) throws IOException {
+    store.delete(key);
+  }
+
+  @Override
+  public void writeBatch(BatchOperation batch) throws IOException {
+    this.store.writeBatch(batch);
+  }
+
+  /**
+   * Given a volume, check if it is empty, i.e there are no buckets inside it.
+   * @param volume - Volume name
+   * @return true if the volume is empty
+   */
+  public boolean isVolumeEmpty(String volume) throws IOException {
+    String dbVolumeRootName = OzoneConsts.OM_VOLUME_PREFIX + volume
+        + OzoneConsts.OM_BUCKET_PREFIX;
+    byte[] dbVolumeRootKey = DFSUtil.string2Bytes(dbVolumeRootName);
+    ImmutablePair<byte[], byte[]> volumeRoot =
+        store.peekAround(0, dbVolumeRootKey);
+    if (volumeRoot != null) {
+      return !DFSUtil.bytes2String(volumeRoot.getKey())
+          .startsWith(dbVolumeRootName);
+    }
+    return true;
+  }
+
+  /**
+   * Given a volume/bucket, check if it is empty,
+   * i.e there are no keys inside it.
+   * @param volume - Volume name
+   * @param bucket - Bucket name
+   * @return true if the bucket is empty
+   */
+  public boolean isBucketEmpty(String volume, String bucket)
+      throws IOException {
+    String keyRootName = getKeyWithDBPrefix(volume, bucket, null);
+    byte[] keyRoot = DFSUtil.string2Bytes(keyRootName);
+    ImmutablePair<byte[], byte[]> firstKey = store.peekAround(0, keyRoot);
+    if (firstKey != null) {
+      return !DFSUtil.bytes2String(firstKey.getKey())
+          .startsWith(keyRootName);
+    }
+    return true;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<OmBucketInfo> listBuckets(final String volumeName,
+                                        final String startBucket, final String bucketPrefix,
+                                        final int maxNumOfBuckets) throws IOException {
+    List<OmBucketInfo> result = new ArrayList<>();
+    if (Strings.isNullOrEmpty(volumeName)) {
+      throw new OMException("Volume name is required.",
+          ResultCodes.FAILED_VOLUME_NOT_FOUND);
+    }
+
+    byte[] volumeNameBytes = getVolumeKey(volumeName);
+    if (store.get(volumeNameBytes) == null) {
+      throw new OMException("Volume " + volumeName + " not found.",
+          ResultCodes.FAILED_VOLUME_NOT_FOUND);
+    }
+
+
+    // A bucket starts with /#volume/#bucket_prefix
+    MetadataKeyFilter filter = (preKey, currentKey, nextKey) -> {
+      if (currentKey != null) {
+        String bucketNamePrefix =
+                getBucketWithDBPrefix(volumeName, bucketPrefix);
+        String bucket = DFSUtil.bytes2String(currentKey);
+        return bucket.startsWith(bucketNamePrefix);
+      }
+      return false;
+    };
+
+    List<Map.Entry<byte[], byte[]>> rangeResult;
+    if (!Strings.isNullOrEmpty(startBucket)) {
+      // Since we are excluding start key from the result,
+      // the maxNumOfBuckets is incremented.
+      rangeResult = store.getSequentialRangeKVs(
+          getBucketKey(volumeName, startBucket),
+          maxNumOfBuckets + 1, filter);
+      if (!rangeResult.isEmpty()) {
+        //Remove start key from result.
+        rangeResult.remove(0);
+      }
+    } else {
+      rangeResult = store.getSequentialRangeKVs(null, maxNumOfBuckets, filter);
+    }
+
+    for (Map.Entry<byte[], byte[]> entry : rangeResult) {
+      OmBucketInfo info = OmBucketInfo.getFromProtobuf(
+          BucketInfo.parseFrom(entry.getValue()));
+      result.add(info);
+    }
+    return result;
+  }
+
+  @Override
+  public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
+                                  String startKey, String keyPrefix, int maxKeys) throws IOException {
+    List<OmKeyInfo> result = new ArrayList<>();
+    if (Strings.isNullOrEmpty(volumeName)) {
+      throw new OMException("Volume name is required.",
+          ResultCodes.FAILED_VOLUME_NOT_FOUND);
+    }
+
+    if (Strings.isNullOrEmpty(bucketName)) {
+      throw new OMException("Bucket name is required.",
+          ResultCodes.FAILED_BUCKET_NOT_FOUND);
+    }
+
+    byte[] bucketNameBytes = getBucketKey(volumeName, bucketName);
+    if (store.get(bucketNameBytes) == null) {
+      throw new OMException("Bucket " + bucketName + " not found.",
+          ResultCodes.FAILED_BUCKET_NOT_FOUND);
+    }
+
+    MetadataKeyFilter filter = new KeyPrefixFilter()
+        .addFilter(getKeyWithDBPrefix(volumeName, bucketName, keyPrefix));
+
+    List<Map.Entry<byte[], byte[]>> rangeResult;
+    if (!Strings.isNullOrEmpty(startKey)) {
+      //Since we are excluding start key from the result,
+      // the maxNumOfBuckets is incremented.
+      rangeResult = store.getSequentialRangeKVs(
+          getDBKeyBytes(volumeName, bucketName, startKey),
+          maxKeys + 1, filter);
+      if (!rangeResult.isEmpty()) {
+        //Remove start key from result.
+        rangeResult.remove(0);
+      }
+    } else {
+      rangeResult = store.getSequentialRangeKVs(null, maxKeys, filter);
+    }
+
+    for (Map.Entry<byte[], byte[]> entry : rangeResult) {
+      OmKeyInfo info = OmKeyInfo.getFromProtobuf(
+          KeyInfo.parseFrom(entry.getValue()));
+      result.add(info);
+    }
+    return result;
+  }
+
+  @Override
+  public List<OmVolumeArgs> listVolumes(String userName,
+                                        String prefix, String startKey, int maxKeys) throws IOException {
+    List<OmVolumeArgs> result = Lists.newArrayList();
+    VolumeList volumes;
+    if (Strings.isNullOrEmpty(userName)) {
+      volumes = getAllVolumes();
+    } else {
+      volumes = getVolumesByUser(userName);
+    }
+
+    if (volumes == null || volumes.getVolumeNamesCount() == 0) {
+      return result;
+    }
+
+    boolean startKeyFound = Strings.isNullOrEmpty(startKey);
+    for (String volumeName : volumes.getVolumeNamesList()) {
+      if (!Strings.isNullOrEmpty(prefix)) {
+        if (!volumeName.startsWith(prefix)) {
+          continue;
+        }
+      }
+
+      if (!startKeyFound && volumeName.equals(startKey)) {
+        startKeyFound = true;
+        continue;
+      }
+      if (startKeyFound && result.size() < maxKeys) {
+        byte[] volumeInfo = store.get(this.getVolumeKey(volumeName));
+        if (volumeInfo == null) {
+          // Could not get volume info by given volume name,
+          // since the volume name is loaded from db,
+          // this probably means om db is corrupted or some entries are
+          // accidentally removed.
+          throw new OMException("Volume info not found for " + volumeName,
+              ResultCodes.FAILED_VOLUME_NOT_FOUND);
+        }
+        VolumeInfo info = VolumeInfo.parseFrom(volumeInfo);
+        OmVolumeArgs volumeArgs = OmVolumeArgs.getFromProtobuf(info);
+        result.add(volumeArgs);
+      }
+    }
+
+    return result;
+  }
+
+  private VolumeList getVolumesByUser(String userName)
+      throws OMException {
+    return getVolumesByUser(getUserKey(userName));
+  }
+
+  private VolumeList getVolumesByUser(byte[] userNameKey)
+      throws OMException {
+    VolumeList volumes = null;
+    try {
+      byte[] volumesInBytes = store.get(userNameKey);
+      if (volumesInBytes == null) {
+        // No volume found for this user, return an empty list
+        return VolumeList.newBuilder().build();
+      }
+      volumes = VolumeList.parseFrom(volumesInBytes);
+    } catch (IOException e) {
+      throw new OMException("Unable to get volumes info by the given user, "
+          + "metadata might be corrupted", e,
+          ResultCodes.FAILED_METADATA_ERROR);
+    }
+    return volumes;
+  }
+
+  private VolumeList getAllVolumes() throws IOException {
+    // Scan all users in database
+    KeyPrefixFilter filter =
+        new KeyPrefixFilter().addFilter(OzoneConsts.OM_USER_PREFIX);
+    // We are not expecting a huge number of users per cluster,
+    // it should be fine to scan all users in db and return us a
+    // list of volume names in string per user.
+    List<Map.Entry<byte[], byte[]>> rangeKVs = store
+        .getSequentialRangeKVs(null, Integer.MAX_VALUE, filter);
+
+    VolumeList.Builder builder = VolumeList.newBuilder();
+    for (Map.Entry<byte[], byte[]> entry : rangeKVs) {
+      VolumeList volumes = this.getVolumesByUser(entry.getKey());
+      builder.addAllVolumeNames(volumes.getVolumeNamesList());
+    }
+
+    return builder.build();
+  }
+
+  @Override
+  public List<BlockGroup> getPendingDeletionKeys(final int count)
+      throws IOException {
+    List<BlockGroup> keyBlocksList = Lists.newArrayList();
+    List<Map.Entry<byte[], byte[]>> rangeResult =
+        store.getRangeKVs(null, count,
+            MetadataKeyFilters.getDeletingKeyFilter());
+    for (Map.Entry<byte[], byte[]> entry : rangeResult) {
+      OmKeyInfo info =
+          OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));
+      // Get block keys as a list.
+      OmKeyLocationInfoGroup latest = info.getLatestVersionLocations();
+      if (latest == null) {
+        return Collections.emptyList();
+      }
+      List<BlockID> item = latest.getLocationList().stream()
+          .map(b->new BlockID(b.getContainerID(), b.getLocalID()))
+          .collect(Collectors.toList());
+      BlockGroup keyBlocks = BlockGroup.newBuilder()
+          .setKeyName(DFSUtil.bytes2String(entry.getKey()))
+          .addAllBlockIDs(item)
+          .build();
+      keyBlocksList.add(keyBlocks);
+    }
+    return keyBlocksList;
+  }
+
+  @Override
+  public List<BlockGroup> getExpiredOpenKeys() throws IOException {
+    List<BlockGroup> keyBlocksList = Lists.newArrayList();
+    long now = Time.now();
+    final MetadataKeyFilter openKeyFilter =
+        new KeyPrefixFilter().addFilter(OPEN_KEY_PREFIX);
+    List<Map.Entry<byte[], byte[]>> rangeResult =
+        store.getSequentialRangeKVs(null, Integer.MAX_VALUE,
+            openKeyFilter);
+    for (Map.Entry<byte[], byte[]> entry : rangeResult) {
+      OmKeyInfo info =
+          OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));
+      long lastModify = info.getModificationTime();
+      if (now - lastModify < this.openKeyExpireThresholdMS) {
+        // consider as may still be active, not hanging.
+        continue;
+      }
+      // Get block keys as a list.
+      List<BlockID> item = info.getLatestVersionLocations()
+          .getBlocksLatestVersionOnly().stream()
+          .map(b->new BlockID(b.getContainerID(), b.getLocalID()))
+          .collect(Collectors.toList());
+      BlockGroup keyBlocks = BlockGroup.newBuilder()
+          .setKeyName(DFSUtil.bytes2String(entry.getKey()))
+          .addAllBlockIDs(item)
+          .build();
+      keyBlocksList.add(keyBlocks);
+    }
+    return keyBlocksList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java
new file mode 100644
index 0000000..8d94f5a
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OpenKeyCleanupService.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.utils.BackgroundService;
+import org.apache.hadoop.utils.BackgroundTask;
+import org.apache.hadoop.utils.BackgroundTaskQueue;
+import org.apache.hadoop.utils.BackgroundTaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This is the background service to delete hanging open keys.
+ * Scan the metadata of om periodically to get
+ * the keys with prefix "#open#" and ask scm to
+ * delete metadata accordingly, if scm returns
+ * success for keys, then clean up those keys.
+ */
+public class OpenKeyCleanupService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OpenKeyCleanupService.class);
+
+  private final static int OPEN_KEY_DELETING_CORE_POOL_SIZE = 2;
+
+  private final KeyManager keyManager;
+  private final ScmBlockLocationProtocol scmClient;
+
+  public OpenKeyCleanupService(ScmBlockLocationProtocol scmClient,
+      KeyManager keyManager, int serviceInterval,
+      long serviceTimeout) {
+    super("OpenKeyCleanupService", serviceInterval, TimeUnit.SECONDS,
+        OPEN_KEY_DELETING_CORE_POOL_SIZE, serviceTimeout);
+    this.keyManager = keyManager;
+    this.scmClient = scmClient;
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new OpenKeyDeletingTask());
+    return queue;
+  }
+
+  private class OpenKeyDeletingTask
+      implements BackgroundTask<BackgroundTaskResult> {
+
+    @Override
+    public int getPriority() {
+      return 0;
+    }
+
+    @Override
+    public BackgroundTaskResult call() throws Exception {
+      try {
+        List<BlockGroup> keyBlocksList = keyManager.getExpiredOpenKeys();
+        if (keyBlocksList.size() > 0) {
+          int toDeleteSize = keyBlocksList.size();
+          LOG.debug("Found {} to-delete open keys in OM", toDeleteSize);
+          List<DeleteBlockGroupResult> results =
+              scmClient.deleteKeyBlocks(keyBlocksList);
+          int deletedSize = 0;
+          for (DeleteBlockGroupResult result : results) {
+            if (result.isSuccess()) {
+              try {
+                keyManager.deleteExpiredOpenKey(result.getObjectKey());
+                LOG.debug("Key {} deleted from OM DB", result.getObjectKey());
+                deletedSize += 1;
+              } catch (IOException e) {
+                LOG.warn("Failed to delete hanging-open key {}",
+                    result.getObjectKey(), e);
+              }
+            } else {
+              LOG.warn("Deleting open Key {} failed because some of the blocks"
+                      + " were failed to delete, failed blocks: {}",
+                  result.getObjectKey(),
+                  StringUtils.join(",", result.getFailedBlocks()));
+            }
+          }
+          LOG.info("Found {} expired open key entries, successfully " +
+              "cleaned up {} entries", toDeleteSize, deletedSize);
+          return results::size;
+        } else {
+          LOG.debug("No hanging open key found in OM");
+        }
+      } catch (IOException e) {
+        LOG.error("Unable to get hanging open keys, retry in"
+            + " next interval", e);
+      }
+      return BackgroundTaskResult.EmptyTaskResult.newResult();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
new file mode 100644
index 0000000..71fa921
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -0,0 +1,911 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.BlockingService;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
+import org.apache.hadoop.ozone.common.Storage.StorageState;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .ServicePort;
+import org.apache.hadoop.ozone.protocol.proto
+    .OzoneManagerProtocolProtos.OzoneAclInfo;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocolPB
+    .ScmBlockLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
+import org.apache.hadoop.hdds.scm.protocolPB
+    .StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.StringUtils;
+
+import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
+import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
+import static org.apache.hadoop.hdds.HddsUtils.isHddsEnabled;
+import static org.apache.hadoop.ozone.OmUtils.getOmAddress;
+import static org.apache.hadoop.hdds.server.ServerUtils
+    .updateRPCListenAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
+import static org.apache.hadoop.ozone.om.OMConfigKeys
+    .OZONE_OM_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys
+    .OZONE_OM_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys
+    .OZONE_OM_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.ozone.protocol.proto
+    .OzoneManagerProtocolProtos.OzoneManagerService
+    .newReflectiveBlockingService;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
+    .NodeState.HEALTHY;
+import static org.apache.hadoop.util.ExitUtil.terminate;
+
+/**
+ * Ozone Manager is the metadata manager of ozone.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
+public final class OzoneManager extends ServiceRuntimeInfoImpl
+    implements OzoneManagerProtocol, OMMXBean {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OzoneManager.class);
+
+  private static final String USAGE =
+      "Usage: \n ozone om [genericOptions] " + "[ "
+          + StartupOption.CREATEOBJECTSTORE.getName() + " ]\n " + "ozone om [ "
+          + StartupOption.HELP.getName() + " ]\n";
+
+  /** Startup options. */
+  public enum StartupOption {
+    CREATEOBJECTSTORE("-createObjectStore"),
+    HELP("-help"),
+    REGULAR("-regular");
+
+    private final String name;
+
+    StartupOption(String arg) {
+      this.name = arg;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public static StartupOption parse(String value) {
+      for (StartupOption option : StartupOption.values()) {
+        if (option.name.equalsIgnoreCase(value)) {
+          return option;
+        }
+      }
+      return null;
+    }
+  }
+
+  private final OzoneConfiguration configuration;
+  private final RPC.Server omRpcServer;
+  private final InetSocketAddress omRpcAddress;
+  private final OMMetadataManager metadataManager;
+  private final VolumeManager volumeManager;
+  private final BucketManager bucketManager;
+  private final KeyManager keyManager;
+  private final OMMetrics metrics;
+  private final OzoneManagerHttpServer httpServer;
+  private final OMStorage omStorage;
+  private final ScmBlockLocationProtocol scmBlockClient;
+  private final StorageContainerLocationProtocol scmContainerClient;
+  private ObjectName omInfoBeanName;
+
+  private OzoneManager(OzoneConfiguration conf) throws IOException {
+    Preconditions.checkNotNull(conf);
+    configuration = conf;
+    omStorage = new OMStorage(conf);
+    scmBlockClient = getScmBlockClient(configuration);
+    scmContainerClient = getScmContainerClient(configuration);
+    if (omStorage.getState() != StorageState.INITIALIZED) {
+      throw new OMException("OM not initialized.",
+          ResultCodes.OM_NOT_INITIALIZED);
+    }
+
+    // verifies that the SCM info in the OM Version file is correct.
+    ScmInfo scmInfo = scmBlockClient.getScmInfo();
+    if (!(scmInfo.getClusterId().equals(omStorage.getClusterID()) && scmInfo
+        .getScmId().equals(omStorage.getScmId()))) {
+      throw new OMException("SCM version info mismatch.",
+          ResultCodes.SCM_VERSION_MISMATCH_ERROR);
+    }
+    final int handlerCount = conf.getInt(OZONE_OM_HANDLER_COUNT_KEY,
+        OZONE_OM_HANDLER_COUNT_DEFAULT);
+
+    RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
+        ProtobufRpcEngine.class);
+
+    BlockingService omService = newReflectiveBlockingService(
+        new OzoneManagerProtocolServerSideTranslatorPB(this));
+    final InetSocketAddress omNodeRpcAddr =
+        getOmAddress(configuration);
+    omRpcServer = startRpcServer(configuration, omNodeRpcAddr,
+        OzoneManagerProtocolPB.class, omService,
+        handlerCount);
+    omRpcAddress = updateRPCListenAddress(configuration,
+        OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
+    metadataManager = new OmMetadataManagerImpl(configuration);
+    volumeManager = new VolumeManagerImpl(metadataManager, configuration);
+    bucketManager = new BucketManagerImpl(metadataManager);
+    metrics = OMMetrics.create();
+    keyManager =
+        new KeyManagerImpl(scmBlockClient, metadataManager, configuration,
+            omStorage.getOmId());
+    httpServer = new OzoneManagerHttpServer(configuration, this);
+  }
+
+  /**
+   * Create a scm block client, used by putKey() and getKey().
+   *
+   * @return {@link ScmBlockLocationProtocol}
+   * @throws IOException
+   */
+  private static ScmBlockLocationProtocol getScmBlockClient(
+      OzoneConfiguration conf) throws IOException {
+    RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class,
+        ProtobufRpcEngine.class);
+    long scmVersion =
+        RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
+    InetSocketAddress scmBlockAddress =
+        getScmAddressForBlockClients(conf);
+    ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
+        new ScmBlockLocationProtocolClientSideTranslatorPB(
+            RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion,
+                scmBlockAddress, UserGroupInformation.getCurrentUser(), conf,
+                NetUtils.getDefaultSocketFactory(conf),
+                Client.getRpcTimeout(conf)));
+    return scmBlockLocationClient;
+  }
+
+  /**
+   * Returns a scm container client.
+   *
+   * @return {@link StorageContainerLocationProtocol}
+   * @throws IOException
+   */
+  private static StorageContainerLocationProtocol getScmContainerClient(
+      OzoneConfiguration conf) throws IOException {
+    RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
+        ProtobufRpcEngine.class);
+    long scmVersion =
+        RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
+    InetSocketAddress scmAddr = getScmAddressForClients(
+        conf);
+    StorageContainerLocationProtocolClientSideTranslatorPB scmContainerClient =
+        new StorageContainerLocationProtocolClientSideTranslatorPB(
+            RPC.getProxy(StorageContainerLocationProtocolPB.class, scmVersion,
+                scmAddr, UserGroupInformation.getCurrentUser(), conf,
+                NetUtils.getDefaultSocketFactory(conf),
+                Client.getRpcTimeout(conf)));
+    return scmContainerClient;
+  }
+
+  @VisibleForTesting
+  public KeyManager getKeyManager() {
+    return keyManager;
+  }
+
+  @VisibleForTesting
+  public ScmInfo getScmInfo() throws IOException {
+    return scmBlockClient.getScmInfo();
+  }
+
+  @VisibleForTesting
+  public OMStorage getOmStorage() {
+    return omStorage;
+  }
+  /**
+   * Starts an RPC server, if configured.
+   *
+   * @param conf configuration
+   * @param addr configured address of RPC server
+   * @param protocol RPC protocol provided by RPC server
+   * @param instance RPC protocol implementation instance
+   * @param handlerCount RPC server handler count
+   *
+   * @return RPC server
+   * @throws IOException if there is an I/O error while creating RPC server
+   */
+  private static RPC.Server startRpcServer(OzoneConfiguration conf,
+      InetSocketAddress addr, Class<?> protocol, BlockingService instance,
+      int handlerCount) throws IOException {
+    RPC.Server rpcServer = new RPC.Builder(conf)
+        .setProtocol(protocol)
+        .setInstance(instance)
+        .setBindAddress(addr.getHostString())
+        .setPort(addr.getPort())
+        .setNumHandlers(handlerCount)
+        .setVerbose(false)
+        .setSecretManager(null)
+        .build();
+
+    DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
+    return rpcServer;
+  }
+
+  /**
+   * Get metadata manager.
+   * @return metadata manager.
+   */
+  public OMMetadataManager getMetadataManager() {
+    return metadataManager;
+  }
+
+  public OMMetrics getMetrics() {
+    return metrics;
+  }
+
+  /**
+   * Main entry point for starting OzoneManager.
+   *
+   * @param argv arguments
+   * @throws IOException if startup fails due to I/O error
+   */
+  public static void main(String[] argv) throws IOException {
+    if (DFSUtil.parseHelpArgument(argv, USAGE, System.out, true)) {
+      System.exit(0);
+    }
+    try {
+      OzoneConfiguration conf = new OzoneConfiguration();
+      GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
+      if (!hParser.isParseSuccessful()) {
+        System.err.println("USAGE: " + USAGE + " \n");
+        hParser.printGenericCommandUsage(System.err);
+        System.exit(1);
+      }
+      StringUtils.startupShutdownMessage(OzoneManager.class, argv, LOG);
+      OzoneManager om = createOm(hParser.getRemainingArgs(), conf);
+      if (om != null) {
+        om.start();
+        om.join();
+      }
+    } catch (Throwable t) {
+      LOG.error("Failed to start the OzoneManager.", t);
+      terminate(1, t);
+    }
+  }
+
+  private static void printUsage(PrintStream out) {
+    out.println(USAGE + "\n");
+  }
+
+  /**
+   * Constructs OM instance based on command line arguments.
+   * @param argv Command line arguments
+   * @param conf OzoneConfiguration
+   * @return OM instance
+   * @throws IOException in case OM instance creation fails.
+   */
+
+  public static OzoneManager createOm(String[] argv,
+                                      OzoneConfiguration conf) throws IOException {
+    if (!isHddsEnabled(conf)) {
+      System.err.println("OM cannot be started in secure mode or when " +
+          OZONE_ENABLED + " is set to false");
+      System.exit(1);
+    }
+    StartupOption startOpt = parseArguments(argv);
+    if (startOpt == null) {
+      printUsage(System.err);
+      terminate(1);
+      return null;
+    }
+    switch (startOpt) {
+    case CREATEOBJECTSTORE:
+      terminate(omInit(conf) ? 0 : 1);
+      return null;
+    case HELP:
+      printUsage(System.err);
+      terminate(0);
+      return null;
+    default:
+      return new OzoneManager(conf);
+    }
+  }
+
+  /**
+   * Initializes the OM instance.
+   * @param conf OzoneConfiguration
+   * @return true if OM initialization succeeds, false otherwise
+   * @throws IOException in case ozone metadata directory path is not accessible
+   */
+
+  private static boolean omInit(OzoneConfiguration conf) throws IOException {
+    OMStorage omStorage = new OMStorage(conf);
+    StorageState state = omStorage.getState();
+    if (state != StorageState.INITIALIZED) {
+      try {
+        ScmBlockLocationProtocol scmBlockClient = getScmBlockClient(conf);
+        ScmInfo scmInfo = scmBlockClient.getScmInfo();
+        String clusterId = scmInfo.getClusterId();
+        String scmId = scmInfo.getScmId();
+        if (clusterId == null || clusterId.isEmpty()) {
+          throw new IOException("Invalid Cluster ID");
+        }
+        if (scmId == null || scmId.isEmpty()) {
+          throw new IOException("Invalid SCM ID");
+        }
+        omStorage.setClusterId(clusterId);
+        omStorage.setScmId(scmId);
+        omStorage.initialize();
+        System.out.println(
+            "OM initialization succeeded.Current cluster id for sd="
+                + omStorage.getStorageDir() + ";cid=" + omStorage
+                .getClusterID());
+        return true;
+      } catch (IOException ioe) {
+        LOG.error("Could not initialize OM version file", ioe);
+        return false;
+      }
+    } else {
+      System.out.println(
+          "OM already initialized.Reusing existing cluster id for sd="
+              + omStorage.getStorageDir() + ";cid=" + omStorage
+              .getClusterID());
+      return true;
+    }
+  }
+
+  /**
+   * Parses the command line options for OM initialization.
+   * @param args command line arguments
+   * @return StartupOption if options are valid, null otherwise
+   */
+  private static StartupOption parseArguments(String[] args) {
+    if (args == null || args.length == 0) {
+      return StartupOption.REGULAR;
+    } else if (args.length == 1) {
+      return StartupOption.parse(args[0]);
+    }
+    return null;
+  }
+
+  /**
+   * Builds a message for logging startup information about an RPC server.
+   *
+   * @param description RPC server description
+   * @param addr RPC server listening address
+   * @return server startup message
+   */
+  private static String buildRpcServerStartMessage(String description,
+      InetSocketAddress addr) {
+    return addr != null ? String.format("%s is listening at %s",
+        description, addr.toString()) :
+        String.format("%s not started", description);
+  }
+
+  /**
+   * Start service.
+   */
+  public void start() throws IOException {
+    LOG.info(buildRpcServerStartMessage("OzoneManager RPC server",
+        omRpcAddress));
+    DefaultMetricsSystem.initialize("OzoneManager");
+    metadataManager.start();
+    keyManager.start();
+    omRpcServer.start();
+    httpServer.start();
+    registerMXBean();
+    setStartTime();
+  }
+
+  /**
+   * Stop service.
+   */
+  public void stop() {
+    try {
+      metadataManager.stop();
+      omRpcServer.stop();
+      keyManager.stop();
+      httpServer.stop();
+      metrics.unRegister();
+      unregisterMXBean();
+    } catch (Exception e) {
+      LOG.error("OzoneManager stop failed.", e);
+    }
+  }
+
+  /**
+   * Wait until service has completed shutdown.
+   */
+  public void join() {
+    try {
+      omRpcServer.join();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.info("Interrupted during OzoneManager join.", e);
+    }
+  }
+
+  /**
+   * Creates a volume.
+   *
+   * @param args - Arguments to create Volume.
+   * @throws IOException
+   */
+  @Override
+  public void createVolume(OmVolumeArgs args) throws IOException {
+    try {
+      metrics.incNumVolumeCreates();
+      volumeManager.createVolume(args);
+    } catch (Exception ex) {
+      metrics.incNumVolumeCreateFails();
+      throw ex;
+    }
+  }
+
+  /**
+   * Changes the owner of a volume.
+   *
+   * @param volume - Name of the volume.
+   * @param owner - Name of the owner.
+   * @throws IOException
+   */
+  @Override
+  public void setOwner(String volume, String owner) throws IOException {
+    try {
+      metrics.incNumVolumeUpdates();
+      volumeManager.setOwner(volume, owner);
+    } catch (Exception ex) {
+      metrics.incNumVolumeUpdateFails();
+      throw ex;
+    }
+  }
+
+  /**
+   * Changes the Quota on a volume.
+   *
+   * @param volume - Name of the volume.
+   * @param quota - Quota in bytes.
+   * @throws IOException
+   */
+  @Override
+  public void setQuota(String volume, long quota) throws IOException {
+    try {
+      metrics.incNumVolumeUpdates();
+      volumeManager.setQuota(volume, quota);
+    } catch (Exception ex) {
+      metrics.incNumVolumeUpdateFails();
+      throw ex;
+    }
+  }
+
+  /**
+   * Checks if the specified user can access this volume.
+   *
+   * @param volume - volume
+   * @param userAcl - user acls which needs to be checked for access
+   * @return true if the user has required access for the volume,
+   *         false otherwise
+   * @throws IOException
+   */
+  @Override
+  public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
+      throws IOException {
+    try {
+      metrics.incNumVolumeCheckAccesses();
+      return volumeManager.checkVolumeAccess(volume, userAcl);
+    } catch (Exception ex) {
+      metrics.incNumVolumeCheckAccessFails();
+      throw ex;
+    }
+  }
+
+  /**
+   * Gets the volume information.
+   *
+   * @param volume - Volume name.
+   * @return VolumeArgs or exception is thrown.
+   * @throws IOException
+   */
+  @Override
+  public OmVolumeArgs getVolumeInfo(String volume) throws IOException {
+    try {
+      metrics.incNumVolumeInfos();
+      return volumeManager.getVolumeInfo(volume);
+    } catch (Exception ex) {
+      metrics.incNumVolumeInfoFails();
+      throw ex;
+    }
+  }
+
+  /**
+   * Deletes an existing empty volume.
+   *
+   * @param volume - Name of the volume.
+   * @throws IOException
+   */
+  @Override
+  public void deleteVolume(String volume) throws IOException {
+    try {
+      metrics.incNumVolumeDeletes();
+      volumeManager.deleteVolume(volume);
+    } catch (Exception ex) {
+      metrics.incNumVolumeDeleteFails();
+      throw ex;
+    }
+  }
+
+  /**
+   * Lists volume owned by a specific user.
+   *
+   * @param userName - user name
+   * @param prefix - Filter prefix -- Return only entries that match this.
+   * @param prevKey - Previous key -- List starts from the next from the
+   * prevkey
+   * @param maxKeys - Max number of keys to return.
+   * @return List of Volumes.
+   * @throws IOException
+   */
+  @Override
+  public List<OmVolumeArgs> listVolumeByUser(String userName, String prefix,
+                                             String prevKey, int maxKeys) throws IOException {
+    try {
+      metrics.incNumVolumeLists();
+      return volumeManager.listVolumes(userName, prefix, prevKey, maxKeys);
+    } catch (Exception ex) {
+      metrics.incNumVolumeListFails();
+      throw ex;
+    }
+  }
+
+  /**
+   * Lists volume all volumes in the cluster.
+   *
+   * @param prefix - Filter prefix -- Return only entries that match this.
+   * @param prevKey - Previous key -- List starts from the next from the
+   * prevkey
+   * @param maxKeys - Max number of keys to return.
+   * @return List of Volumes.
+   * @throws IOException
+   */
+  @Override
+  public List<OmVolumeArgs> listAllVolumes(String prefix, String prevKey, int
+      maxKeys) throws IOException {
+    try {
+      metrics.incNumVolumeLists();
+      return volumeManager.listVolumes(null, prefix, prevKey, maxKeys);
+    } catch (Exception ex) {
+      metrics.incNumVolumeListFails();
+      throw ex;
+    }
+  }
+
+  /**
+   * Creates a bucket.
+   *
+   * @param bucketInfo - BucketInfo to create bucket.
+   * @throws IOException
+   */
+  @Override
+  public void createBucket(OmBucketInfo bucketInfo) throws IOException {
+    try {
+      metrics.incNumBucketCreates();
+      bucketManager.createBucket(bucketInfo);
+    } catch (Exception ex) {
+      metrics.incNumBucketCreateFails();
+      throw ex;
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<OmBucketInfo> listBuckets(String volumeName,
+                                        String startKey, String prefix, int maxNumOfBuckets)
+      throws IOException {
+    try {
+      metrics.incNumBucketLists();
+      return bucketManager.listBuckets(volumeName,
+          startKey, prefix, maxNumOfBuckets);
+    } catch (IOException ex) {
+      metrics.incNumBucketListFails();
+      throw ex;
+    }
+  }
+
+  /**
+   * Gets the bucket information.
+   *
+   * @param volume - Volume name.
+   * @param bucket - Bucket name.
+   * @return OmBucketInfo or exception is thrown.
+   * @throws IOException
+   */
+  @Override
+  public OmBucketInfo getBucketInfo(String volume, String bucket)
+      throws IOException {
+    try {
+      metrics.incNumBucketInfos();
+      return bucketManager.getBucketInfo(volume, bucket);
+    } catch (Exception ex) {
+      metrics.incNumBucketInfoFails();
+      throw ex;
+    }
+  }
+
+  /**
+   * Allocate a key.
+   *
+   * @param args - attributes of the key.
+   * @return OmKeyInfo - the info about the allocated key.
+   * @throws IOException
+   */
+  @Override
+  public OpenKeySession openKey(OmKeyArgs args) throws IOException {
+    try {
+      metrics.incNumKeyAllocates();
+      return keyManager.openKey(args);
+    } catch (Exception ex) {
+      metrics.incNumKeyAllocateFails();
+      throw ex;
+    }
+  }
+
+  @Override
+  public void commitKey(OmKeyArgs args, int clientID)
+      throws IOException {
+    try {
+      metrics.incNumKeyCommits();
+      keyManager.commitKey(args, clientID);
+    } catch (Exception ex) {
+      metrics.incNumKeyCommitFails();
+      throw ex;
+    }
+  }
+
+  @Override
+  public OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID)
+      throws IOException {
+    try {
+      metrics.incNumBlockAllocateCalls();
+      return keyManager.allocateBlock(args, clientID);
+    } catch (Exception ex) {
+      metrics.incNumBlockAllocateCallFails();
+      throw ex;
+    }
+  }
+
+  /**
+   * Lookup a key.
+   *
+   * @param args - attributes of the key.
+   * @return OmKeyInfo - the info about the requested key.
+   * @throws IOException
+   */
+  @Override
+  public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
+    try {
+      metrics.incNumKeyLookups();
+      return keyManager.lookupKey(args);
+    } catch (Exception ex) {
+      metrics.incNumKeyLookupFails();
+      throw ex;
+    }
+  }
+
+  @Override
+  public void renameKey(OmKeyArgs args, String toKeyName) throws IOException {
+    try {
+      metrics.incNumKeyRenames();
+      keyManager.renameKey(args, toKeyName);
+    } catch (IOException e) {
+      metrics.incNumKeyRenameFails();
+      throw e;
+    }
+  }
+
+  /**
+   * Deletes an existing key.
+   *
+   * @param args - attributes of the key.
+   * @throws IOException
+   */
+  @Override
+  public void deleteKey(OmKeyArgs args) throws IOException {
+    try {
+      metrics.incNumKeyDeletes();
+      keyManager.deleteKey(args);
+    } catch (Exception ex) {
+      metrics.incNumKeyDeleteFails();
+      throw ex;
+    }
+  }
+
+  @Override
+  public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
+                                  String startKey, String keyPrefix, int maxKeys) throws IOException {
+    try {
+      metrics.incNumKeyLists();
+      return keyManager.listKeys(volumeName, bucketName,
+          startKey, keyPrefix, maxKeys);
+    } catch (IOException ex) {
+      metrics.incNumKeyListFails();
+      throw ex;
+    }
+  }
+
+  /**
+   * Sets bucket property from args.
+   * @param args - BucketArgs.
+   * @throws IOException
+   */
+  @Override
+  public void setBucketProperty(OmBucketArgs args)
+      throws IOException {
+    try {
+      metrics.incNumBucketUpdates();
+      bucketManager.setBucketProperty(args);
+    } catch (Exception ex) {
+      metrics.incNumBucketUpdateFails();
+      throw ex;
+    }
+  }
+
+
+  /**
+   * Deletes an existing empty bucket from volume.
+   * @param volume - Name of the volume.
+   * @param bucket - Name of the bucket.
+   * @throws IOException
+   */
+  public void deleteBucket(String volume, String bucket) throws IOException {
+    try {
+      metrics.incNumBucketDeletes();
+      bucketManager.deleteBucket(volume, bucket);
+    } catch (Exception ex) {
+      metrics.incNumBucketDeleteFails();
+      throw ex;
+    }
+  }
+
+  private void registerMXBean() {
+    Map<String, String> jmxProperties = new HashMap<String, String>();
+    jmxProperties.put("component", "ServerRuntime");
+    this.omInfoBeanName =
+        MBeans.register("OzoneManager",
+            "OzoneManagerInfo",
+            jmxProperties,
+            this);
+  }
+
+  private void unregisterMXBean() {
+    if (this.omInfoBeanName != null) {
+      MBeans.unregister(this.omInfoBeanName);
+      this.omInfoBeanName = null;
+    }
+  }
+
+  @Override
+  public String getRpcPort() {
+    return "" + omRpcAddress.getPort();
+  }
+
+  @VisibleForTesting
+  public OzoneManagerHttpServer getHttpServer() {
+    return httpServer;
+  }
+
+  @Override
+  public List<ServiceInfo> getServiceList() throws IOException {
+    // When we implement multi-home this call has to be handled properly.
+    List<ServiceInfo> services = new ArrayList<>();
+    ServiceInfo.Builder omServiceInfoBuilder = ServiceInfo.newBuilder()
+        .setNodeType(HddsProtos.NodeType.OM)
+        .setHostname(omRpcAddress.getHostName())
+        .addServicePort(ServicePort.newBuilder()
+                .setType(ServicePort.Type.RPC)
+                .setValue(omRpcAddress.getPort())
+            .build());
+    if (httpServer.getHttpAddress() != null) {
+      omServiceInfoBuilder.addServicePort(ServicePort.newBuilder()
+          .setType(ServicePort.Type.HTTP)
+          .setValue(httpServer.getHttpAddress().getPort())
+          .build());
+    }
+    if (httpServer.getHttpsAddress() != null) {
+      omServiceInfoBuilder.addServicePort(ServicePort.newBuilder()
+          .setType(ServicePort.Type.HTTPS)
+          .setValue(httpServer.getHttpsAddress().getPort())
+          .build());
+    }
+    services.add(omServiceInfoBuilder.build());
+
+    // For client we have to return SCM with container protocol port,
+    // not block protocol.
+    InetSocketAddress scmAddr = getScmAddressForClients(
+        configuration);
+    ServiceInfo.Builder scmServiceInfoBuilder = ServiceInfo.newBuilder()
+        .setNodeType(HddsProtos.NodeType.SCM)
+        .setHostname(scmAddr.getHostName())
+        .addServicePort(ServicePort.newBuilder()
+            .setType(ServicePort.Type.RPC)
+            .setValue(scmAddr.getPort()).build());
+    services.add(scmServiceInfoBuilder.build());
+
+    List<HddsProtos.Node> nodes = scmContainerClient.queryNode(HEALTHY,
+        HddsProtos.QueryScope.CLUSTER, "");
+
+    for (HddsProtos.Node node : nodes) {
+      HddsProtos.DatanodeDetailsProto datanode = node.getNodeID();
+
+      ServiceInfo.Builder dnServiceInfoBuilder = ServiceInfo.newBuilder()
+          .setNodeType(HddsProtos.NodeType.DATANODE)
+          .setHostname(datanode.getHostName());
+
+      dnServiceInfoBuilder.addServicePort(ServicePort.newBuilder()
+          .setType(ServicePort.Type.HTTP)
+          .setValue(DatanodeDetails.getFromProtoBuf(datanode)
+              .getPort(DatanodeDetails.Port.Name.REST).getValue())
+          .build());
+
+      services.add(dnServiceInfoBuilder.build());
+    }
+
+    metrics.incNumGetServiceLists();
+    // For now there is no exception that can can happen in this call,
+    // so failure metrics is not handled. In future if there is any need to
+    // handle exception in this method, we need to incorporate
+    // metrics.incNumGetServiceListFails()
+    return services;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java
new file mode 100644
index 0000000..bd6ab69
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerHttpServer.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.hdds.server.BaseHttpServer;
+
+import java.io.IOException;
+
+/**
+ * HttpServer wrapper for the OzoneManager.
+ */
+public class OzoneManagerHttpServer extends BaseHttpServer {
+
+  public OzoneManagerHttpServer(Configuration conf, OzoneManager om)
+      throws IOException {
+    super(conf, "ozoneManager");
+    addServlet("serviceList", "/serviceList", ServiceListJSONServlet.class);
+    getWebAppContext().setAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE, om);
+  }
+
+  @Override protected String getHttpAddressKey() {
+    return OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY;
+  }
+
+  @Override protected String getHttpBindHostKey() {
+    return OMConfigKeys.OZONE_OM_HTTP_BIND_HOST_KEY;
+  }
+
+  @Override protected String getHttpsAddressKey() {
+    return OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY;
+  }
+
+  @Override protected String getHttpsBindHostKey() {
+    return OMConfigKeys.OZONE_OM_HTTPS_BIND_HOST_KEY;
+  }
+
+  @Override protected String getBindHostDefault() {
+    return OMConfigKeys.OZONE_OM_HTTP_BIND_HOST_DEFAULT;
+  }
+
+  @Override protected int getHttpBindPortDefault() {
+    return OMConfigKeys.OZONE_OM_HTTP_BIND_PORT_DEFAULT;
+  }
+
+  @Override protected int getHttpsBindPortDefault() {
+    return OMConfigKeys.OZONE_OM_HTTPS_BIND_PORT_DEFAULT;
+  }
+
+  @Override protected String getKeytabFile() {
+    return OMConfigKeys.OZONE_OM_KEYTAB_FILE;
+  }
+
+  @Override protected String getSpnegoPrincipal() {
+    return OzoneConfigKeys.OZONE_SCM_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL;
+  }
+
+  @Override protected String getEnabledKey() {
+    return OMConfigKeys.OZONE_OM_HTTP_ENABLED_KEY;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ServiceListJSONServlet.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ServiceListJSONServlet.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ServiceListJSONServlet.java
new file mode 100644
index 0000000..47713e2
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ServiceListJSONServlet.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+
+/**
+ * Provides REST access to Ozone Service List.
+ * <p>
+ * This servlet generally will be placed under the /serviceList URL of
+ * OzoneManager HttpServer.
+ *
+ * The return format is of JSON and in the form
+ * <p>
+ *  <code><pre>
+ *  {
+ *    "services" : [
+ *      {
+ *        "NodeType":"OM",
+ *        "Hostname" "$hostname",
+ *        "ports" : {
+ *          "$PortType" : "$port",
+ *          ...
+ *        }
+ *      }
+ *    ]
+ *  }
+ *  </pre></code>
+ *  <p>
+ *
+ */
+public class ServiceListJSONServlet  extends HttpServlet  {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ServiceListJSONServlet.class);
+  private static final long serialVersionUID = 1L;
+
+  private transient OzoneManager om;
+
+  public void init() throws ServletException {
+    this.om = (OzoneManager) getServletContext()
+        .getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE);
+  }
+
+  /**
+   * Process a GET request for the specified resource.
+   *
+   * @param request
+   *          The servlet request we are processing
+   * @param response
+   *          The servlet response we are creating
+   */
+  @Override
+  public void doGet(HttpServletRequest request, HttpServletResponse response) {
+    try {
+      ObjectMapper objectMapper = new ObjectMapper();
+      objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
+      response.setContentType("application/json; charset=utf8");
+      PrintWriter writer = response.getWriter();
+      try {
+        writer.write(objectMapper.writeValueAsString(om.getServiceList()));
+      } finally {
+        if (writer != null) {
+          writer.close();
+        }
+      }
+    } catch (IOException e) {
+      LOG.error(
+          "Caught an exception while processing ServiceList request", e);
+      response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java
new file mode 100644
index 0000000..8475dd9
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/VolumeManager.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.protocol.proto
+    .OzoneManagerProtocolProtos.OzoneAclInfo;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * OM volume manager interface.
+ */
+public interface VolumeManager {
+
+  /**
+   * Create a new volume.
+   * @param args - Volume args to create a volume
+   */
+  void createVolume(OmVolumeArgs args) throws IOException;
+
+  /**
+   * Changes the owner of a volume.
+   *
+   * @param volume - Name of the volume.
+   * @param owner - Name of the owner.
+   * @throws IOException
+   */
+  void setOwner(String volume, String owner) throws IOException;
+
+  /**
+   * Changes the Quota on a volume.
+   *
+   * @param volume - Name of the volume.
+   * @param quota - Quota in bytes.
+   * @throws IOException
+   */
+  void setQuota(String volume, long quota) throws IOException;
+
+  /**
+   * Gets the volume information.
+   * @param volume - Volume name.
+   * @return VolumeArgs or exception is thrown.
+   * @throws IOException
+   */
+  OmVolumeArgs getVolumeInfo(String volume) throws IOException;
+
+  /**
+   * Deletes an existing empty volume.
+   *
+   * @param volume - Name of the volume.
+   * @throws IOException
+   */
+  void deleteVolume(String volume) throws IOException;
+
+  /**
+   * Checks if the specified user with a role can access this volume.
+   *
+   * @param volume - volume
+   * @param userAcl - user acl which needs to be checked for access
+   * @return true if the user has access for the volume, false otherwise
+   * @throws IOException
+   */
+  boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
+      throws IOException;
+
+  /**
+   * Returns a list of volumes owned by a given user; if user is null,
+   * returns all volumes.
+   *
+   * @param userName
+   *   volume owner
+   * @param prefix
+   *   the volume prefix used to filter the listing result.
+   * @param startKey
+   *   the start volume name determines where to start listing from,
+   *   this key is excluded from the result.
+   * @param maxKeys
+   *   the maximum number of volumes to return.
+   * @return a list of {@link OmVolumeArgs}
+   * @throws IOException
+   */
+  List<OmVolumeArgs> listVolumes(String userName, String prefix,
+                                 String startKey, int maxKeys) throws IOException;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message