Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8C6D7200CFD for ; Thu, 17 Aug 2017 02:11:51 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8A946169DA4; Thu, 17 Aug 2017 00:11:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 63383169D7A for ; Thu, 17 Aug 2017 02:11:49 +0200 (CEST) Received: (qmail 80132 invoked by uid 500); 17 Aug 2017 00:11:47 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 79287 invoked by uid 99); 17 Aug 2017 00:11:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Aug 2017 00:11:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 72409F565C; Thu, 17 Aug 2017 00:11:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: inigoiri@apache.org To: common-commits@hadoop.apache.org Date: Thu, 17 Aug 2017 00:11:52 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [07/24] hadoop git commit: HDFS-10630. Federation State Store FS Implementation. Contributed by Jason Kace and Inigo Goiri. archived-at: Thu, 17 Aug 2017 00:11:51 -0000 HDFS-10630. Federation State Store FS Implementation. Contributed by Jason Kace and Inigo Goiri. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d91628b8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d91628b8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d91628b8 Branch: refs/heads/HDFS-10467 Commit: d91628b880ffdd9bb1079a35133fc0b8ce5c3593 Parents: 3a6f78d Author: Inigo Goiri Authored: Tue May 2 15:49:53 2017 -0700 Committer: Inigo Goiri Committed: Wed Aug 16 17:11:34 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 14 + .../federation/router/PeriodicService.java | 198 ++++++++ .../StateStoreConnectionMonitorService.java | 67 +++ .../federation/store/StateStoreService.java | 152 +++++- .../federation/store/StateStoreUtils.java | 51 +- .../store/driver/StateStoreDriver.java | 31 +- .../driver/StateStoreRecordOperations.java | 17 +- .../store/driver/impl/StateStoreBaseImpl.java | 31 +- .../driver/impl/StateStoreFileBaseImpl.java | 429 ++++++++++++++++ .../store/driver/impl/StateStoreFileImpl.java | 161 +++++++ .../driver/impl/StateStoreFileSystemImpl.java | 178 +++++++ .../driver/impl/StateStoreSerializableImpl.java | 77 +++ .../federation/store/records/BaseRecord.java | 20 +- .../server/federation/store/records/Query.java | 66 +++ .../src/main/resources/hdfs-default.xml | 16 + .../store/FederationStateStoreTestUtils.java | 232 +++++++++ .../store/driver/TestStateStoreDriverBase.java | 483 +++++++++++++++++++ .../store/driver/TestStateStoreFile.java | 64 +++ .../store/driver/TestStateStoreFileSystem.java | 88 ++++ 19 files changed, 2329 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91628b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 320e1f3..2b6d0e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; @@ -25,6 +27,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl; import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl; import org.apache.hadoop.http.HttpConfig; @@ -1119,6 +1123,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys { FEDERATION_STORE_SERIALIZER_CLASS_DEFAULT = StateStoreSerializerPBImpl.class; + public static final String FEDERATION_STORE_DRIVER_CLASS = + FEDERATION_STORE_PREFIX + "driver.class"; + public static final Class + FEDERATION_STORE_DRIVER_CLASS_DEFAULT = StateStoreFileImpl.class; + + public static final String FEDERATION_STORE_CONNECTION_TEST_MS = + FEDERATION_STORE_PREFIX + "connection.test"; + public static final long FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT = + TimeUnit.MINUTES.toMillis(1); + // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91628b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java new file mode 100644 index 0000000..5e12222 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Service to periodically execute a runnable. + */ +public abstract class PeriodicService extends AbstractService { + + private static final Logger LOG = + LoggerFactory.getLogger(PeriodicService.class); + + /** Default interval in milliseconds for the periodic service. */ + private static final long DEFAULT_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); + + + /** Interval for running the periodic service in milliseconds. */ + private long intervalMs; + /** Name of the service. */ + private final String serviceName; + + /** Scheduler for the periodic service. */ + private final ScheduledExecutorService scheduler; + + /** If the service is running. */ + private volatile boolean isRunning = false; + + /** How many times we run. */ + private long runCount; + /** How many errors we got. */ + private long errorCount; + /** When was the last time we executed this service successfully. */ + private long lastRun; + + /** + * Create a new periodic update service. + * + * @param name Name of the service. + */ + public PeriodicService(String name) { + this(name, DEFAULT_INTERVAL_MS); + } + + /** + * Create a new periodic update service. + * + * @param name Name of the service. + * @param interval Interval for the periodic service in milliseconds. + */ + public PeriodicService(String name, long interval) { + super(name); + this.serviceName = name; + this.intervalMs = interval; + + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat(this.getName() + "-%d") + .build(); + this.scheduler = Executors.newScheduledThreadPool(1, threadFactory); + } + + /** + * Set the interval for the periodic service. + * + * @param interval Interval in milliseconds. + */ + protected void setIntervalMs(long interval) { + if (getServiceState() == STATE.STARTED) { + throw new ServiceStateException("Periodic service already started"); + } else { + this.intervalMs = interval; + } + } + + /** + * Get the interval for the periodic service. + * + * @return Interval in milliseconds. + */ + protected long getIntervalMs() { + return this.intervalMs; + } + + /** + * Get how many times we failed to run the periodic service. + * + * @return Times we failed to run the periodic service. + */ + protected long getErrorCount() { + return this.errorCount; + } + + /** + * Get how many times we run the periodic service. + * + * @return Times we run the periodic service. + */ + protected long getRunCount() { + return this.runCount; + } + + /** + * Get the last time the periodic service was executed. + * + * @return Last time the periodic service was executed. + */ + protected long getLastUpdate() { + return this.lastRun; + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + LOG.info("Starting periodic service {}", this.serviceName); + startPeriodic(); + } + + @Override + protected void serviceStop() throws Exception { + stopPeriodic(); + LOG.info("Stopping periodic service {}", this.serviceName); + super.serviceStop(); + } + + /** + * Stop the periodic task. + */ + protected synchronized void stopPeriodic() { + if (this.isRunning) { + LOG.info("{} is shutting down", this.serviceName); + this.isRunning = false; + this.scheduler.shutdownNow(); + } + } + + /** + * Start the periodic execution. + */ + protected synchronized void startPeriodic() { + stopPeriodic(); + + // Create the runnable service + Runnable updateRunnable = new Runnable() { + @Override + public void run() { + LOG.debug("Running {} update task", serviceName); + try { + if (!isRunning) { + return; + } + periodicInvoke(); + runCount++; + lastRun = Time.now(); + } catch (Exception ex) { + errorCount++; + LOG.warn(serviceName + " service threw an exception", ex); + } + } + }; + + // Start the execution of the periodic service + this.isRunning = true; + this.scheduler.scheduleWithFixedDelay( + updateRunnable, 0, this.intervalMs, TimeUnit.MILLISECONDS); + } + + /** + * Method that the service will run periodically. + */ + protected abstract void periodicInvoke(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91628b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java new file mode 100644 index 0000000..4d279c5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreConnectionMonitorService.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.PeriodicService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service to periodically monitor the connection of the StateStore + * {@link StateStoreService} data store and to re-open the connection + * to the data store if required. + */ +public class StateStoreConnectionMonitorService extends PeriodicService { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreConnectionMonitorService.class); + + /** Service that maintains the State Store connection. */ + private final StateStoreService stateStore; + + + /** + * Create a new service to monitor the connectivity of the state store driver. + * + * @param store Instance of the state store to be monitored. + */ + public StateStoreConnectionMonitorService(StateStoreService store) { + super(StateStoreConnectionMonitorService.class.getSimpleName()); + this.stateStore = store; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.setIntervalMs(conf.getLong( + DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS, + DFSConfigKeys.FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT)); + + super.serviceInit(conf); + } + + @Override + public void periodicInvoke() { + LOG.debug("Checking state store connection"); + if (!stateStore.isDriverReady()) { + LOG.info("Attempting to open state store driver."); + stateStore.loadDriver(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91628b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java index 866daa3..df207e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java @@ -15,45 +15,168 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hdfs.server.federation.store; +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedList; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; /** * A service to initialize a * {@link org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver - * StateStoreDriver} and maintain the connection to the data store. There - * are multiple state store driver connections supported: + * StateStoreDriver} and maintain the connection to the data store. There are + * multiple state store driver connections supported: *
    - *
  • File {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl. + *
  • File + * {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl. * StateStoreFileImpl StateStoreFileImpl} - *
  • ZooKeeper {@link org.apache.hadoop.hdfs.server.federation.store.driver. - * impl.StateStoreZooKeeperImpl StateStoreZooKeeperImpl} + *
  • ZooKeeper + * {@link org.apache.hadoop.hdfs.server.federation.store.driver.impl. + * StateStoreZooKeeperImpl StateStoreZooKeeperImpl} *
*

- * The service also supports the dynamic registration of data interfaces such as - * the following: + * The service also supports the dynamic registration of record stores like: *

    - *
  • {@link MembershipStateStore}: state of the Namenodes in the + *
  • {@link MembershipStore}: state of the Namenodes in the * federation. *
  • {@link MountTableStore}: Mount table between to subclusters. * See {@link org.apache.hadoop.fs.viewfs.ViewFs ViewFs}. - *
  • {@link RouterStateStore}: State of the routers in the federation. *
*/ @InterfaceAudience.Private @InterfaceStability.Evolving public class StateStoreService extends CompositeService { + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreService.class); + + + /** State Store configuration. */ + private Configuration conf; + /** Identifier for the service. */ private String identifier; - // Stub class - public StateStoreService(String name) { - super(name); + /** Driver for the back end connection. */ + private StateStoreDriver driver; + + /** Service to maintain data store connection. */ + private StateStoreConnectionMonitorService monitorService; + + + public StateStoreService() { + super(StateStoreService.class.getName()); + } + + /** + * Initialize the State Store and the connection to the backend. + * + * @param config Configuration for the State Store. + * @throws IOException + */ + @Override + protected void serviceInit(Configuration config) throws Exception { + this.conf = config; + + // Create implementation of State Store + Class driverClass = this.conf.getClass( + DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS, + DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS_DEFAULT, + StateStoreDriver.class); + this.driver = ReflectionUtils.newInstance(driverClass, this.conf); + + if (this.driver == null) { + throw new IOException("Cannot create driver for the State Store"); + } + + // Check the connection to the State Store periodically + this.monitorService = new StateStoreConnectionMonitorService(this); + this.addService(monitorService); + + super.serviceInit(this.conf); + } + + @Override + protected void serviceStart() throws Exception { + loadDriver(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + closeDriver(); + + super.serviceStop(); + } + + /** + * List of records supported by this State Store. + * + * @return List of supported record classes. + */ + public Collection> getSupportedRecords() { + // TODO add list of records + return new LinkedList<>(); + } + + /** + * Load the State Store driver. If successful, refresh cached data tables. + */ + public void loadDriver() { + synchronized (this.driver) { + if (!isDriverReady()) { + String driverName = this.driver.getClass().getSimpleName(); + if (this.driver.init(conf, getIdentifier(), getSupportedRecords())) { + LOG.info("Connection to the State Store driver {} is open and ready", + driverName); + } else { + LOG.error("Cannot initialize State Store driver {}", driverName); + } + } + } + } + + /** + * Check if the driver is ready to be used. + * + * @return If the driver is ready. + */ + public boolean isDriverReady() { + return this.driver.isDriverReady(); + } + + /** + * Manually shuts down the driver. + * + * @throws Exception If the driver cannot be closed. + */ + @VisibleForTesting + public void closeDriver() throws Exception { + if (this.driver != null) { + this.driver.close(); + } + } + + /** + * Get the state store driver. + * + * @return State store driver. + */ + public StateStoreDriver getDriver() { + return this.driver; } /** @@ -74,4 +197,5 @@ public class StateStoreService extends CompositeService { public void setIdentifier(String id) { this.identifier = id; } -} + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91628b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java index 8c681df..0a36619 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java @@ -17,17 +17,22 @@ */ package org.apache.hadoop.hdfs.server.federation.store; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import java.util.ArrayList; +import java.util.List; + import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Set of utility functions used to query, create, update and delete data - * records in the state store. + * Set of utility functions used to work with the State Store. */ public final class StateStoreUtils { - private static final Log LOG = LogFactory.getLog(StateStoreUtils.class); + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreUtils.class); + private StateStoreUtils() { // Utility class @@ -52,7 +57,7 @@ public final class StateStoreUtils { // Check if we went too far if (actualClazz.equals(BaseRecord.class)) { - LOG.error("We went too far (" + actualClazz + ") with " + clazz); + LOG.error("We went too far ({}) with {}", actualClazz, clazz); actualClazz = clazz; } return actualClazz; @@ -69,4 +74,36 @@ public final class StateStoreUtils { Class getRecordClass(final T record) { return getRecordClass(record.getClass()); } -} + + /** + * Get the base class name for a record. If we get an implementation of a + * record we will return the real parent record class. + * + * @param clazz Class of the data record to check. + * @return Name of the base class for the record. + */ + public static String getRecordName( + final Class clazz) { + return getRecordClass(clazz).getSimpleName(); + } + + /** + * Filters a list of records to find all records matching the query. + * + * @param query Map of field names and objects to use to filter results. + * @param records List of data records to filter. + * @return List of all records matching the query (or empty list if none + * match), null if the data set could not be filtered. + */ + public static List filterMultiple( + final Query query, final Iterable records) { + + List matchingList = new ArrayList<>(); + for (T record : records) { + if (query.matches(record)) { + matchingList.add(record); + } + } + return matchingList; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91628b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java index a1527df..90111bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java @@ -18,15 +18,16 @@ package org.apache.hadoop.hdfs.server.federation.store.driver; import java.net.InetAddress; -import java.util.List; +import java.util.Collection; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Driver class for an implementation of a {@link StateStoreService} @@ -35,7 +36,8 @@ import org.apache.hadoop.util.Time; */ public abstract class StateStoreDriver implements StateStoreRecordOperations { - private static final Log LOG = LogFactory.getLog(StateStoreDriver.class); + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreDriver.class); /** State Store configuration. */ @@ -47,13 +49,14 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations { /** * Initialize the state store connection. + * * @param config Configuration for the driver. * @param id Identifier for the driver. * @param records Records that are supported. * @return If initialized and ready, false if failed to initialize driver. */ public boolean init(final Configuration config, final String id, - final List> records) { + final Collection> records) { this.conf = config; this.identifier = id; @@ -62,8 +65,20 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations { LOG.warn("The identifier for the State Store connection is not set"); } - // TODO stub - return false; + boolean success = initDriver(); + if (!success) { + LOG.error("Cannot intialize driver for {}", getDriverName()); + return false; + } + + for (Class cls : records) { + String recordString = StateStoreUtils.getRecordName(cls); + if (!initRecordStorage(recordString, cls)) { + LOG.error("Cannot initialize record store for {}", cls.getSimpleName()); + return false; + } + } + return true; } /** @@ -169,4 +184,4 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations { } return hostname; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91628b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java index 739eeba..e76a733 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java @@ -19,11 +19,11 @@ package org.apache.hadoop.hdfs.server.federation.store.driver; import java.io.IOException; import java.util.List; -import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; import org.apache.hadoop.io.retry.AtMostOnce; import org.apache.hadoop.io.retry.Idempotent; @@ -67,14 +67,14 @@ public interface StateStoreRecordOperations { * Get a single record from the store that matches the query. * * @param clazz Class of record to fetch. - * @param query Map of field names and objects to filter results. + * @param query Query to filter results. * @return A single record matching the query. Null if there are no matching * records or more than one matching record in the store. * @throws IOException If multiple records match or if the data store cannot * be queried. */ @Idempotent - T get(Class clazz, Map query) + T get(Class clazz, Query query) throws IOException; /** @@ -83,14 +83,14 @@ public interface StateStoreRecordOperations { * supports filtering it should overwrite this method. * * @param clazz Class of record to fetch. - * @param query Map of field names and objects to filter results. + * @param query Query to filter results. * @return Records of type clazz that match the query or empty list if none * are found. * @throws IOException Throws exception if unable to query the data store. */ @Idempotent List getMultiple( - Class clazz, Map query) throws IOException; + Class clazz, Query query) throws IOException; /** * Creates a single record. Optionally updates an existing record with same @@ -152,13 +152,12 @@ public interface StateStoreRecordOperations { * Remove multiple records of a specific class that match a query. Requires * the getAll implementation to fetch fresh records on each call. * - * @param clazz Class of record to remove. - * @param filter matching filter to remove. + * @param query Query to filter what to remove. * @return The number of records removed. * @throws IOException Throws exception if unable to query the data store. */ @AtMostOnce - int remove(Class clazz, Map filter) + int remove(Class clazz, Query query) throws IOException; -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91628b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java index b711fa9..1bd35f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java @@ -17,14 +17,17 @@ */ package org.apache.hadoop.hdfs.server.federation.store.driver.impl; +import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple; + import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; /** * Base implementation of a State Store driver. It contains default @@ -41,7 +44,7 @@ public abstract class StateStoreBaseImpl extends StateStoreDriver { @Override public T get( - Class clazz, Map query) throws IOException { + Class clazz, Query query) throws IOException { List records = getMultiple(clazz, query); if (records.size() > 1) { throw new IOException("Found more than one object in collection"); @@ -53,17 +56,31 @@ public abstract class StateStoreBaseImpl extends StateStoreDriver { } @Override + public List getMultiple( + Class clazz, Query query) throws IOException { + QueryResult result = get(clazz); + List records = result.getRecords(); + List ret = filterMultiple(query, records); + if (ret == null) { + throw new IOException("Cannot fetch records from the store"); + } + return ret; + } + + @Override public boolean put( T record, boolean allowUpdate, boolean errorIfExists) throws IOException { - List singletonList = new ArrayList(); + List singletonList = new ArrayList<>(); singletonList.add(record); return putAll(singletonList, allowUpdate, errorIfExists); } @Override public boolean remove(T record) throws IOException { - Map primaryKeys = record.getPrimaryKeys(); - Class clazz = StateStoreUtils.getRecordClass(record); - return remove(clazz, primaryKeys) == 1; + final Query query = new Query(record); + Class clazz = record.getClass(); + @SuppressWarnings("unchecked") + Class recordClass = (Class)StateStoreUtils.getRecordClass(clazz); + return remove(recordClass, query) == 1; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91628b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java new file mode 100644 index 0000000..d7c00ff --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java @@ -0,0 +1,429 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple; +import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordClass; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.Query; +import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link StateStoreDriver} implementation based on a local file. + */ +public abstract class StateStoreFileBaseImpl + extends StateStoreSerializableImpl { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreFileBaseImpl.class); + + /** If it is initialized. */ + private boolean initialized = false; + + /** Name of the file containing the data. */ + private static final String DATA_FILE_NAME = "records.data"; + + + /** + * Lock reading records. + * + * @param clazz Class of the record. + */ + protected abstract void lockRecordRead(Class clazz); + + /** + * Unlock reading records. + * + * @param clazz Class of the record. + */ + protected abstract void unlockRecordRead( + Class clazz); + + /** + * Lock writing records. + * + * @param clazz Class of the record. + */ + protected abstract void lockRecordWrite( + Class clazz); + + /** + * Unlock writing records. + * + * @param clazz Class of the record. + */ + protected abstract void unlockRecordWrite( + Class clazz); + + /** + * Get the reader for the file system. + * + * @param clazz Class of the record. + */ + protected abstract BufferedReader getReader( + Class clazz, String sub); + + /** + * Get the writer for the file system. + * + * @param clazz Class of the record. + */ + protected abstract BufferedWriter getWriter( + Class clazz, String sub); + + /** + * Check if a path exists. + * + * @param path Path to check. + * @return If the path exists. + */ + protected abstract boolean exists(String path); + + /** + * Make a directory. + * + * @param path Path of the directory to create. + * @return If the directory was created. + */ + protected abstract boolean mkdir(String path); + + /** + * Get root directory. + * + * @return Root directory. + */ + protected abstract String getRootDir(); + + /** + * Set the driver as initialized. + * + * @param ini If the driver is initialized. + */ + public void setInitialized(boolean ini) { + this.initialized = ini; + } + + @Override + public boolean initDriver() { + String rootDir = getRootDir(); + try { + if (rootDir == null) { + LOG.error("Invalid root directory, unable to initialize driver."); + return false; + } + + // Check root path + if (!exists(rootDir)) { + if (!mkdir(rootDir)) { + LOG.error("Cannot create State Store root directory {}", rootDir); + return false; + } + } + } catch (Exception ex) { + LOG.error( + "Cannot initialize filesystem using root directory {}", rootDir, ex); + return false; + } + setInitialized(true); + return true; + } + + @Override + public boolean initRecordStorage( + String className, Class recordClass) { + + String dataDirPath = getRootDir() + "/" + className; + try { + // Create data directories for files + if (!exists(dataDirPath)) { + LOG.info("{} data directory doesn't exist, creating it", dataDirPath); + if (!mkdir(dataDirPath)) { + LOG.error("Cannot create data directory {}", dataDirPath); + return false; + } + String dataFilePath = dataDirPath + "/" + DATA_FILE_NAME; + if (!exists(dataFilePath)) { + // Create empty file + List emtpyList = new ArrayList<>(); + if(!writeAll(emtpyList, recordClass)) { + LOG.error("Cannot create data file {}", dataFilePath); + return false; + } + } + } + } catch (Exception ex) { + LOG.error("Cannot create data directory {}", dataDirPath, ex); + return false; + } + return true; + } + + /** + * Read all lines from a file and deserialize into the desired record type. + * + * @param reader Open handle for the file. + * @param recordClass Record class to create. + * @param includeDates True if dateModified/dateCreated are serialized. + * @return List of records. + * @throws IOException + */ + private List getAllFile( + BufferedReader reader, Class clazz, boolean includeDates) + throws IOException { + + List ret = new ArrayList(); + String line; + while ((line = reader.readLine()) != null) { + if (!line.startsWith("#") && line.length() > 0) { + try { + T record = newRecord(line, clazz, includeDates); + ret.add(record); + } catch (Exception ex) { + LOG.error("Cannot parse line in data source file: {}", line, ex); + } + } + } + return ret; + } + + @Override + public QueryResult get(Class clazz) + throws IOException { + return get(clazz, (String)null); + } + + @Override + public QueryResult get(Class clazz, String sub) + throws IOException { + verifyDriverReady(); + BufferedReader reader = null; + lockRecordRead(clazz); + try { + reader = getReader(clazz, sub); + List data = getAllFile(reader, clazz, true); + return new QueryResult(data, getTime()); + } catch (Exception ex) { + LOG.error("Cannot fetch records {}", clazz.getSimpleName()); + throw new IOException("Cannot read from data store " + ex.getMessage()); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + LOG.error("Failed closing file", e); + } + } + unlockRecordRead(clazz); + } + } + + /** + * Overwrite the existing data with a new data set. + * + * @param list List of records to write. + * @param writer BufferedWriter stream to write to. + * @return If the records were succesfully written. + */ + private boolean writeAllFile( + Collection records, BufferedWriter writer) { + + try { + for (BaseRecord record : records) { + try { + String data = serializeString(record); + writer.write(data); + writer.newLine(); + } catch (IllegalArgumentException ex) { + LOG.error("Cannot write record {} to file", record, ex); + } + } + writer.flush(); + return true; + } catch (IOException e) { + LOG.error("Cannot commit records to file", e); + return false; + } + } + + /** + * Overwrite the existing data with a new data set. Replaces all records in + * the data store for this record class. If all records in the data store are + * not successfully committed, this function must return false and leave the + * data store unchanged. + * + * @param records List of records to write. All records must be of type + * recordClass. + * @param recordClass Class of record to replace. + * @return true if all operations were successful, false otherwise. + * @throws StateStoreUnavailableException + */ + public boolean writeAll( + Collection records, Class recordClass) + throws StateStoreUnavailableException { + verifyDriverReady(); + lockRecordWrite(recordClass); + BufferedWriter writer = null; + try { + writer = getWriter(recordClass, null); + return writeAllFile(records, writer); + } catch (Exception e) { + LOG.error( + "Cannot add records to file for {}", recordClass.getSimpleName(), e); + return false; + } finally { + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + LOG.error( + "Cannot close writer for {}", recordClass.getSimpleName(), e); + } + } + unlockRecordWrite(recordClass); + } + } + + /** + * Get the data file name. + * + * @return Data file name. + */ + protected String getDataFileName() { + return DATA_FILE_NAME; + } + + @Override + public boolean isDriverReady() { + return this.initialized; + } + + @Override + public boolean putAll( + List records, boolean allowUpdate, boolean errorIfExists) + throws StateStoreUnavailableException { + verifyDriverReady(); + + if (records.isEmpty()) { + return true; + } + + @SuppressWarnings("unchecked") + Class clazz = (Class) getRecordClass(records.get(0).getClass()); + QueryResult result; + try { + result = get(clazz); + } catch (IOException e) { + return false; + } + Map writeList = new HashMap<>(); + + // Write all of the existing records + for (T existingRecord : result.getRecords()) { + String key = existingRecord.getPrimaryKey(); + writeList.put(key, existingRecord); + } + + // Add inserts and updates, overwrite any existing values + for (T updatedRecord : records) { + try { + updatedRecord.validate(); + String key = updatedRecord.getPrimaryKey(); + if (writeList.containsKey(key) && allowUpdate) { + // Update + writeList.put(key, updatedRecord); + // Update the mod time stamp. Many backends will use their + // own timestamp for the mod time. + updatedRecord.setDateModified(this.getTime()); + } else if (!writeList.containsKey(key)) { + // Insert + // Create/Mod timestamps are already initialized + writeList.put(key, updatedRecord); + } else if (errorIfExists) { + LOG.error("Attempt to insert record {} that already exists", + updatedRecord); + return false; + } + } catch (IllegalArgumentException ex) { + LOG.error("Cannot write invalid record to State Store", ex); + return false; + } + } + + // Write all + boolean status = writeAll(writeList.values(), clazz); + return status; + } + + @Override + public int remove(Class clazz, Query query) + throws StateStoreUnavailableException { + verifyDriverReady(); + + if (query == null) { + return 0; + } + + int removed = 0; + // Get the current records + try { + final QueryResult result = get(clazz); + final List existingRecords = result.getRecords(); + // Write all of the existing records except those to be removed + final List recordsToRemove = filterMultiple(query, existingRecords); + removed = recordsToRemove.size(); + final List newRecords = new LinkedList<>(); + for (T record : existingRecords) { + if (!recordsToRemove.contains(record)) { + newRecords.add(record); + } + } + if (!writeAll(newRecords, clazz)) { + throw new IOException( + "Cannot remove record " + clazz + " query " + query); + } + } catch (IOException e) { + LOG.error("Cannot remove records {} query {}", clazz, query, e); + } + + return removed; + } + + @Override + public boolean removeAll(Class clazz) + throws StateStoreUnavailableException { + verifyDriverReady(); + List emptyList = new ArrayList<>(); + boolean status = writeAll(emptyList, clazz); + return status; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91628b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java new file mode 100644 index 0000000..24e9660 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileImpl.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.io.Files; + +/** + * StateStoreDriver implementation based on a local file. + */ +public class StateStoreFileImpl extends StateStoreFileBaseImpl { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreFileImpl.class); + + /** Configuration keys. */ + public static final String FEDERATION_STORE_FILE_DIRECTORY = + DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.file.directory"; + + /** Synchronization. */ + private static final ReadWriteLock READ_WRITE_LOCK = + new ReentrantReadWriteLock(); + + /** Root directory for the state store. */ + private String rootDirectory; + + + @Override + protected boolean exists(String path) { + File test = new File(path); + return test.exists(); + } + + @Override + protected boolean mkdir(String path) { + File dir = new File(path); + return dir.mkdirs(); + } + + @Override + protected String getRootDir() { + if (this.rootDirectory == null) { + String dir = getConf().get(FEDERATION_STORE_FILE_DIRECTORY); + if (dir == null) { + File tempDir = Files.createTempDir(); + dir = tempDir.getAbsolutePath(); + } + this.rootDirectory = dir; + } + return this.rootDirectory; + } + + @Override + protected void lockRecordWrite(Class recordClass) { + // TODO - Synchronize via FS + READ_WRITE_LOCK.writeLock().lock(); + } + + @Override + protected void unlockRecordWrite( + Class recordClass) { + // TODO - Synchronize via FS + READ_WRITE_LOCK.writeLock().unlock(); + } + + @Override + protected void lockRecordRead(Class recordClass) { + // TODO - Synchronize via FS + READ_WRITE_LOCK.readLock().lock(); + } + + @Override + protected void unlockRecordRead(Class recordClass) { + // TODO - Synchronize via FS + READ_WRITE_LOCK.readLock().unlock(); + } + + @Override + protected BufferedReader getReader( + Class clazz, String sub) { + String filename = StateStoreUtils.getRecordName(clazz); + if (sub != null && sub.length() > 0) { + filename += "/" + sub; + } + filename += "/" + getDataFileName(); + + try { + LOG.debug("Loading file: {}", filename); + File file = new File(getRootDir(), filename); + FileInputStream fis = new FileInputStream(file); + InputStreamReader isr = + new InputStreamReader(fis, StandardCharsets.UTF_8); + BufferedReader reader = new BufferedReader(isr); + return reader; + } catch (Exception ex) { + LOG.error( + "Cannot open read stream for record {}", clazz.getSimpleName(), ex); + return null; + } + } + + @Override + protected BufferedWriter getWriter( + Class clazz, String sub) { + String filename = StateStoreUtils.getRecordName(clazz); + if (sub != null && sub.length() > 0) { + filename += "/" + sub; + } + filename += "/" + getDataFileName(); + + try { + File file = new File(getRootDir(), filename); + FileOutputStream fos = new FileOutputStream(file, false); + OutputStreamWriter osw = + new OutputStreamWriter(fos, StandardCharsets.UTF_8); + BufferedWriter writer = new BufferedWriter(osw); + return writer; + } catch (IOException ex) { + LOG.error( + "Cannot open read stream for record {}", clazz.getSimpleName(), ex); + return null; + } + } + + @Override + public void close() throws Exception { + setInitialized(false); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91628b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java new file mode 100644 index 0000000..5968421 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileSystemImpl.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.net.URI; +import java.nio.charset.StandardCharsets; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * StateStoreDriver} implementation based on a filesystem. The most common uses + * HDFS as a backend. + */ +public class StateStoreFileSystemImpl extends StateStoreFileBaseImpl { + + private static final Logger LOG = + LoggerFactory.getLogger(StateStoreFileSystemImpl.class); + + + /** Configuration keys. */ + public static final String FEDERATION_STORE_FS_PATH = + DFSConfigKeys.FEDERATION_STORE_PREFIX + "driver.fs.path"; + + /** File system to back the State Store. */ + private FileSystem fs; + /** Working path in the filesystem. */ + private String workPath; + + @Override + protected boolean exists(String path) { + try { + return fs.exists(new Path(path)); + } catch (IOException e) { + return false; + } + } + + @Override + protected boolean mkdir(String path) { + try { + return fs.mkdirs(new Path(path)); + } catch (IOException e) { + return false; + } + } + + @Override + protected String getRootDir() { + if (this.workPath == null) { + String rootPath = getConf().get(FEDERATION_STORE_FS_PATH); + URI workUri; + try { + workUri = new URI(rootPath); + fs = FileSystem.get(workUri, getConf()); + } catch (Exception ex) { + return null; + } + this.workPath = rootPath; + } + return this.workPath; + } + + @Override + public void close() throws Exception { + if (fs != null) { + fs.close(); + } + } + + /** + * Get the folder path for the record class' data. + * + * @param cls Data record class. + * @return Path of the folder containing the record class' data files. + */ + private Path getPathForClass(Class clazz) { + if (clazz == null) { + return null; + } + // TODO extract table name from class: entry.getTableName() + String className = StateStoreUtils.getRecordName(clazz); + return new Path(workPath, className); + } + + @Override + protected void lockRecordRead(Class clazz) { + // Not required, synced with HDFS leasing + } + + @Override + protected void unlockRecordRead(Class clazz) { + // Not required, synced with HDFS leasing + } + + @Override + protected void lockRecordWrite(Class clazz) { + // TODO -> wait for lease to be available + } + + @Override + protected void unlockRecordWrite(Class clazz) { + // TODO -> ensure lease is closed for the file + } + + @Override + protected BufferedReader getReader( + Class clazz, String sub) { + + Path path = getPathForClass(clazz); + if (sub != null && sub.length() > 0) { + path = Path.mergePaths(path, new Path("/" + sub)); + } + path = Path.mergePaths(path, new Path("/" + getDataFileName())); + + try { + FSDataInputStream fdis = fs.open(path); + InputStreamReader isr = + new InputStreamReader(fdis, StandardCharsets.UTF_8); + BufferedReader reader = new BufferedReader(isr); + return reader; + } catch (IOException ex) { + LOG.error("Cannot open write stream for {} to {}", + clazz.getSimpleName(), path); + return null; + } + } + + @Override + protected BufferedWriter getWriter( + Class clazz, String sub) { + + Path path = getPathForClass(clazz); + if (sub != null && sub.length() > 0) { + path = Path.mergePaths(path, new Path("/" + sub)); + } + path = Path.mergePaths(path, new Path("/" + getDataFileName())); + + try { + FSDataOutputStream fdos = fs.create(path, true); + OutputStreamWriter osw = + new OutputStreamWriter(fdos, StandardCharsets.UTF_8); + BufferedWriter writer = new BufferedWriter(osw); + return writer; + } catch (IOException ex) { + LOG.error("Cannot open write stream for {} to {}", + clazz.getSimpleName(), path); + return null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91628b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java new file mode 100644 index 0000000..e9b3fdf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.driver.impl; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; + +/** + * State Store driver that stores a serialization of the records. The serializer + * is pluggable. + */ +public abstract class StateStoreSerializableImpl extends StateStoreBaseImpl { + + /** Default serializer for this driver. */ + private StateStoreSerializer serializer; + + + @Override + public boolean init(final Configuration config, final String id, + final Collection> records) { + boolean ret = super.init(config, id, records); + + this.serializer = StateStoreSerializer.getSerializer(config); + + return ret; + } + + /** + * Serialize a record using the serializer. + * @param record Record to serialize. + * @return Byte array with the serialization of the record. + */ + protected byte[] serialize(T record) { + return serializer.serialize(record); + } + + /** + * Serialize a record using the serializer. + * @param record Record to serialize. + * @return String with the serialization of the record. + */ + protected String serializeString(T record) { + return serializer.serializeString(record); + } + + /** + * Creates a record from an input data string. + * @param data Serialized text of the record. + * @param clazz Record class. + * @param includeDates If dateModified and dateCreated are serialized. + * @return The created record. + * @throws IOException + */ + protected T newRecord( + String data, Class clazz, boolean includeDates) throws IOException { + return serializer.deserialize(data, clazz); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91628b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java index 4192a3d..79f99c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java @@ -123,6 +123,24 @@ public abstract class BaseRecord implements Comparable { } /** + * Check if this record matches a partial record. + * + * @param other Partial record. + * @return If this record matches. + */ + public boolean like(BaseRecord other) { + if (other == null) { + return false; + } + Map thisKeys = this.getPrimaryKeys(); + Map otherKeys = other.getPrimaryKeys(); + if (thisKeys == null) { + return otherKeys == null; + } + return thisKeys.equals(otherKeys); + } + + /** * Override equals check to use primary key(s) for comparison. */ @Override @@ -186,4 +204,4 @@ public abstract class BaseRecord implements Comparable { public String toString() { return getPrimaryKey(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91628b8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/Query.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/Query.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/Query.java new file mode 100644 index 0000000..3c59abf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/Query.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.records; + +/** + * Check if a record matches a query. The query is usually a partial record. + * + * @param Type of the record to query. + */ +public class Query { + + /** Partial object to compare against. */ + private final T partial; + + + /** + * Create a query to search for a partial record. + * + * @param partial It defines the attributes to search. + */ + public Query(final T part) { + this.partial = part; + } + + /** + * Get the partial record used to query. + * + * @return The partial record used for the query. + */ + public T getPartial() { + return this.partial; + } + + /** + * Check if a record matches the primary keys or the partial record. + * + * @param other Record to check. + * @return If the record matches. Don't match if there is no partial. + */ + public boolean matches(T other) { + if (this.partial == null) { + return false; + } + return this.partial.like(other); + } + + @Override + public String toString() { + return "Checking: " + this.partial; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91628b8/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 71e45d2..02140a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4579,4 +4579,20 @@ + + dfs.federation.router.store.driver.class + org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl + + Class to implement the State Store. By default it uses the local disk. + + + + + dfs.federation.router.store.connection.test + 60000 + + How often to check for the connection to the State Store in milliseconds. + + + http://git-wip-us.apache.org/repos/asf/hadoop/blob/d91628b8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java new file mode 100644 index 0000000..fc5aebd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS; +import static org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileImpl.FEDERATION_STORE_FILE_DIRECTORY; +import static org.junit.Assert.assertNotNull; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreFileBaseImpl; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.util.Time; + +/** + * Utilities to test the State Store. + */ +public final class FederationStateStoreTestUtils { + + private FederationStateStoreTestUtils() { + // Utility Class + } + + /** + * Get the default State Store driver implementation. + * + * @return Class of the default State Store driver implementation. + */ + public static Class getDefaultDriver() { + return DFSConfigKeys.FEDERATION_STORE_DRIVER_CLASS_DEFAULT; + } + + /** + * Create a default State Store configuration. + * + * @return State Store configuration. + */ + public static Configuration getStateStoreConfiguration() { + Class clazz = getDefaultDriver(); + return getStateStoreConfiguration(clazz); + } + + /** + * Create a new State Store configuration for a particular driver. + * + * @param clazz Class of the driver to create. + * @return State Store configuration. + */ + public static Configuration getStateStoreConfiguration( + Class clazz) { + Configuration conf = new HdfsConfiguration(false); + + conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "hdfs://test"); + + conf.setClass(FEDERATION_STORE_DRIVER_CLASS, clazz, StateStoreDriver.class); + + if (clazz.isAssignableFrom(StateStoreFileBaseImpl.class)) { + setFileConfiguration(conf); + } + return conf; + } + + /** + * Create a new State Store based on a configuration. + * + * @param configuration Configuration for the State Store. + * @return New State Store service. + * @throws IOException If it cannot create the State Store. + * @throws InterruptedException If we cannot wait for the store to start. + */ + public static StateStoreService getStateStore( + Configuration configuration) throws IOException, InterruptedException { + + StateStoreService stateStore = new StateStoreService(); + assertNotNull(stateStore); + + // Set unique identifier, this is normally the router address + String identifier = UUID.randomUUID().toString(); + stateStore.setIdentifier(identifier); + + stateStore.init(configuration); + stateStore.start(); + + // Wait for state store to connect + waitStateStore(stateStore, TimeUnit.SECONDS.toMillis(10)); + + return stateStore; + } + + /** + * Wait for the State Store to initialize its driver. + * + * @param stateStore State Store. + * @param timeoutMs Time out in milliseconds. + * @throws IOException If the State Store cannot be reached. + * @throws InterruptedException If the sleep is interrupted. + */ + public static void waitStateStore(StateStoreService stateStore, + long timeoutMs) throws IOException, InterruptedException { + long startingTime = Time.monotonicNow(); + while (!stateStore.isDriverReady()) { + Thread.sleep(100); + if (Time.monotonicNow() - startingTime > timeoutMs) { + throw new IOException("Timeout waiting for State Store to connect"); + } + } + } + + /** + * Delete the default State Store. + * + * @throws IOException + */ + public static void deleteStateStore() throws IOException { + Class driverClass = getDefaultDriver(); + deleteStateStore(driverClass); + } + + /** + * Delete the State Store. + * @param driverClass Class of the State Store driver implementation. + * @throws IOException If it cannot be deleted. + */ + public static void deleteStateStore( + Class driverClass) throws IOException { + + if (StateStoreFileBaseImpl.class.isAssignableFrom(driverClass)) { + String workingDirectory = System.getProperty("user.dir"); + File dir = new File(workingDirectory + "/statestore"); + if (dir.exists()) { + FileUtils.cleanDirectory(dir); + } + } + } + + /** + * Set the default configuration for drivers based on files. + * + * @param conf Configuration to extend. + */ + public static void setFileConfiguration(Configuration conf) { + String workingPath = System.getProperty("user.dir"); + String stateStorePath = workingPath + "/statestore"; + conf.set(FEDERATION_STORE_FILE_DIRECTORY, stateStorePath); + } + + /** + * Clear all the records from the State Store. + * + * @param store State Store to remove records from. + * @return If the State Store was cleared. + * @throws IOException If it cannot clear the State Store. + */ + public static boolean clearAllRecords(StateStoreService store) + throws IOException { + Collection> allRecords = + store.getSupportedRecords(); + for (Class recordType : allRecords) { + if (!clearRecords(store, recordType)) { + return false; + } + } + return true; + } + + /** + * Clear records from a certain type from the State Store. + * + * @param store State Store to remove records from. + * @param recordClass Class of the records to remove. + * @return If the State Store was cleared. + * @throws IOException If it cannot clear the State Store. + */ + public static boolean clearRecords( + StateStoreService store, Class recordClass) throws IOException { + List emptyList = new ArrayList<>(); + if (!synchronizeRecords(store, emptyList, recordClass)) { + return false; + } + return true; + } + + /** + * Synchronize a set of records. Remove all and keep the ones specified. + * + * @param stateStore State Store service managing the driver. + * @param records Records to add. + * @param clazz Class of the record to synchronize. + * @return If the synchronization succeeded. + * @throws IOException If it cannot connect to the State Store. + */ + public static boolean synchronizeRecords( + StateStoreService stateStore, List records, Class clazz) + throws IOException { + StateStoreDriver driver = stateStore.getDriver(); + driver.verifyDriverReady(); + if (driver.removeAll(clazz)) { + if (driver.putAll(records, true, false)) { + return true; + } + } + return false; + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org