hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject [3/6] YARN-913 service registry: YARN-2652 add hadoop-yarn-registry package under hadoop-yarn
Date Wed, 08 Oct 2014 19:54:50 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/SelectByYarnPersistence.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/SelectByYarnPersistence.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/SelectByYarnPersistence.java
new file mode 100644
index 0000000..004be86
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/SelectByYarnPersistence.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.server.integration;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.registry.client.types.RegistryPathStatus;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.apache.hadoop.registry.server.services.RegistryAdminService;
+
+/**
+ * Select an entry by the YARN persistence policy
+ */
+public class SelectByYarnPersistence
+    implements RegistryAdminService.NodeSelector {
+  private final String id;
+  private final String targetPolicy;
+
+  public SelectByYarnPersistence(String id, String targetPolicy) {
+    Preconditions.checkArgument(!StringUtils.isEmpty(id), "id");
+    Preconditions.checkArgument(!StringUtils.isEmpty(targetPolicy),
+        "targetPolicy");
+    this.id = id;
+    this.targetPolicy = targetPolicy;
+  }
+
+  @Override
+  public boolean shouldSelect(String path,
+      RegistryPathStatus registryPathStatus,
+      ServiceRecord serviceRecord) {
+    String policy =
+        serviceRecord.get(YarnRegistryAttributes.YARN_PERSISTENCE, "");
+    return id.equals(serviceRecord.get(YarnRegistryAttributes.YARN_ID, ""))
+           && (targetPolicy.equals(policy));
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "Select by ID %s and policy %s: {}",
+        id, targetPolicy);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/package-info.java
new file mode 100644
index 0000000..22d8bc5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains the classes which integrate with the YARN resource
+ * manager.
+ */
+package org.apache.hadoop.registry.server.integration;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/package-info.java
new file mode 100644
index 0000000..6962eb8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Server-side classes for the registry
+ * <p>
+ *   These are components intended to be deployed only on servers or in test
+ *   JVMs, rather than on client machines.
+ * <p>
+ *   Example components are: server-side ZK support, a REST service, etc.
+ */
+package org.apache.hadoop.registry.server;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/AddingCompositeService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/AddingCompositeService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/AddingCompositeService.java
new file mode 100644
index 0000000..9faede4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/AddingCompositeService.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.server.services;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.service.Service;
+
+/**
+ * Composite service that exports the add/remove methods.
+ * <p>
+ * This allows external classes to add services to these methods, after which
+ * they follow the same lifecyce.
+ * <p>
+ * It is essential that any service added is in a state where it can be moved
+ * on with that of the parent services. Specifically, do not add an uninited
+ * service to a parent that is already inited —as the <code>start</code>
+ * operation will then fail
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class AddingCompositeService extends CompositeService {
+
+
+  public AddingCompositeService(String name) {
+    super(name);
+  }
+
+  @Override
+  public void addService(Service service) {
+    super.addService(service);
+  }
+
+  @Override
+  public boolean removeService(Service service) {
+    return super.removeService(service);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/DeleteCompletionCallback.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/DeleteCompletionCallback.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/DeleteCompletionCallback.java
new file mode 100644
index 0000000..e160d4a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/DeleteCompletionCallback.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.server.services;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.hadoop.registry.server.integration.RMRegistryOperationsService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Curator callback for delete operations completing.
+ * <p>
+ * This callback logs at debug and increments the event counter.
+ */
+public class DeleteCompletionCallback implements BackgroundCallback {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RMRegistryOperationsService.class);
+
+  private AtomicInteger events = new AtomicInteger(0);
+
+  @Override
+  public void processResult(CuratorFramework client,
+      CuratorEvent event) throws
+      Exception {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Delete event {}", event);
+    }
+    events.incrementAndGet();
+  }
+
+  /**
+   * Get the number of deletion events
+   * @return the count of events
+   */
+  public int getEventCount() {
+    return events.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java
new file mode 100644
index 0000000..3fa0c19
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.server.services;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+import org.apache.hadoop.registry.client.impl.zk.BindingInformation;
+import org.apache.hadoop.registry.client.impl.zk.RegistryBindingSource;
+import org.apache.hadoop.registry.client.impl.zk.RegistryInternalConstants;
+import org.apache.hadoop.registry.client.impl.zk.RegistrySecurity;
+import org.apache.hadoop.registry.client.impl.zk.ZookeeperConfigOptions;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+
+/**
+ * This is a small, localhost Zookeeper service instance that is contained
+ * in a YARN service...it's been derived from Apache Twill.
+ *
+ * It implements {@link RegistryBindingSource} and provides binding information,
+ * <i>once started</i>. Until <code>start()</code> is called, the hostname &
+ * port may be undefined. Accordingly, the service raises an exception in this
+ * condition.
+ *
+ * If you wish to chain together a registry service with this one under
+ * the same <code>CompositeService</code>, this service must be added
+ * as a child first.
+ *
+ * It also sets the configuration parameter
+ * {@link RegistryConstants#KEY_REGISTRY_ZK_QUORUM}
+ * to its connection string. Any code with access to the service configuration
+ * can view it.
+ */
+@InterfaceStability.Evolving
+public class MicroZookeeperService
+    extends AbstractService
+    implements RegistryBindingSource, RegistryConstants,
+    ZookeeperConfigOptions,
+    MicroZookeeperServiceKeys{
+
+
+  private static final Logger
+      LOG = LoggerFactory.getLogger(MicroZookeeperService.class);
+
+  private File instanceDir;
+  private File dataDir;
+  private int tickTime;
+  private int port;
+  private String host;
+  private boolean secureServer;
+
+  private ServerCnxnFactory factory;
+  private BindingInformation binding;
+  private File confDir;
+  private StringBuilder diagnostics = new StringBuilder();
+
+  /**
+   * Create an instance
+   * @param name service name
+   */
+  public MicroZookeeperService(String name) {
+    super(name);
+  }
+
+  /**
+   * Get the connection string.
+   * @return the string
+   * @throws IllegalStateException if the connection is not yet valid
+   */
+  public String getConnectionString() {
+    Preconditions.checkState(factory != null, "service not started");
+    InetSocketAddress addr = factory.getLocalAddress();
+    return String.format("%s:%d", addr.getHostName(), addr.getPort());
+  }
+
+  /**
+   * Get the connection address
+   * @return the connection as an address
+   * @throws IllegalStateException if the connection is not yet valid
+   */
+  public InetSocketAddress getConnectionAddress() {
+    Preconditions.checkState(factory != null, "service not started");
+    return factory.getLocalAddress();
+  }
+
+  /**
+   * Create an inet socket addr from the local host + port number
+   * @param port port to use
+   * @return a (hostname, port) pair
+   * @throws UnknownHostException if the server cannot resolve the host
+   */
+  private InetSocketAddress getAddress(int port) throws UnknownHostException {
+    return new InetSocketAddress(host, port < 0 ? 0 : port);
+  }
+
+  /**
+   * Initialize the service, including choosing a path for the data
+   * @param conf configuration
+   * @throws Exception
+   */
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    port = conf.getInt(KEY_ZKSERVICE_PORT, 0);
+    tickTime = conf.getInt(KEY_ZKSERVICE_TICK_TIME,
+        ZooKeeperServer.DEFAULT_TICK_TIME);
+    String instancedirname = conf.getTrimmed(
+        KEY_ZKSERVICE_DIR, "");
+    host = conf.getTrimmed(KEY_ZKSERVICE_HOST, DEFAULT_ZKSERVICE_HOST);
+    if (instancedirname.isEmpty()) {
+      File testdir = new File(System.getProperty("test.dir", "target"));
+      instanceDir = new File(testdir, "zookeeper" + getName());
+    } else {
+      instanceDir = new File(instancedirname);
+      FileUtil.fullyDelete(instanceDir);
+    }
+    LOG.debug("Instance directory is {}", instanceDir);
+    mkdirStrict(instanceDir);
+    dataDir = new File(instanceDir, "data");
+    confDir = new File(instanceDir, "conf");
+    mkdirStrict(dataDir);
+    mkdirStrict(confDir);
+    super.serviceInit(conf);
+  }
+
+  /**
+   * Create a directory, ignoring if the dir is already there,
+   * and failing if a file or something else was at the end of that
+   * path
+   * @param dir dir to guarantee the existence of
+   * @throws IOException IO problems, or path exists but is not a dir
+   */
+  private void mkdirStrict(File dir) throws IOException {
+    if (!dir.mkdirs()) {
+      if (!dir.isDirectory()) {
+        throw new IOException("Failed to mkdir " + dir);
+      }
+    }
+  }
+
+  /**
+   * Append a formatted string to the diagnostics.
+   * <p>
+   * A newline is appended afterwards.
+   * @param text text including any format commands
+   * @param args arguments for the forma operation.
+   */
+  protected void addDiagnostics(String text, Object ... args) {
+    diagnostics.append(String.format(text, args)).append('\n');
+  }
+
+  /**
+   * Get the diagnostics info
+   * @return the diagnostics string built up
+   */
+  public String getDiagnostics() {
+    return diagnostics.toString();
+  }
+
+  /**
+   * set up security. this must be done prior to creating
+   * the ZK instance, as it sets up JAAS if that has not been done already.
+   *
+   * @return true if the cluster has security enabled.
+   */
+  public boolean setupSecurity() throws IOException {
+    Configuration conf = getConfig();
+    String jaasContext = conf.getTrimmed(KEY_REGISTRY_ZKSERVICE_JAAS_CONTEXT);
+    secureServer = StringUtils.isNotEmpty(jaasContext);
+    if (secureServer) {
+      RegistrySecurity.validateContext(jaasContext);
+      RegistrySecurity.bindZKToServerJAASContext(jaasContext);
+      // policy on failed auth
+      System.setProperty(PROP_ZK_ALLOW_FAILED_SASL_CLIENTS,
+        conf.get(KEY_ZKSERVICE_ALLOW_FAILED_SASL_CLIENTS,
+            "true"));
+
+      //needed so that you can use sasl: strings in the registry
+      System.setProperty(RegistryInternalConstants.ZOOKEEPER_AUTH_PROVIDER +".1",
+          RegistryInternalConstants.SASLAUTHENTICATION_PROVIDER);
+      String serverContext =
+          System.getProperty(PROP_ZK_SERVER_SASL_CONTEXT);
+      addDiagnostics("Server JAAS context s = %s", serverContext);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Startup: start ZK. It is only after this that
+   * the binding information is valid.
+   * @throws Exception
+   */
+  @Override
+  protected void serviceStart() throws Exception {
+
+    setupSecurity();
+
+    ZooKeeperServer zkServer = new ZooKeeperServer();
+    FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, dataDir);
+    zkServer.setTxnLogFactory(ftxn);
+    zkServer.setTickTime(tickTime);
+
+    LOG.info("Starting Local Zookeeper service");
+    factory = ServerCnxnFactory.createFactory();
+    factory.configure(getAddress(port), -1);
+    factory.startup(zkServer);
+
+    String connectString = getConnectionString();
+    LOG.info("In memory ZK started at {}\n", connectString);
+
+    if (LOG.isDebugEnabled()) {
+      StringWriter sw = new StringWriter();
+      PrintWriter pw = new PrintWriter(sw);
+      zkServer.dumpConf(pw);
+      pw.flush();
+      LOG.debug(sw.toString());
+    }
+    binding = new BindingInformation();
+    binding.ensembleProvider = new FixedEnsembleProvider(connectString);
+    binding.description =
+        getName() + " reachable at \"" + connectString + "\"";
+
+    addDiagnostics(binding.description);
+    // finally: set the binding information in the config
+    getConfig().set(KEY_REGISTRY_ZK_QUORUM, connectString);
+  }
+
+  /**
+   * When the service is stopped, it deletes the data directory
+   * and its contents
+   * @throws Exception
+   */
+  @Override
+  protected void serviceStop() throws Exception {
+    if (factory != null) {
+      factory.shutdown();
+      factory = null;
+    }
+    if (dataDir != null) {
+      FileUtil.fullyDelete(dataDir);
+    }
+  }
+
+  @Override
+  public BindingInformation supplyBindingInformation() {
+    Preconditions.checkNotNull(binding,
+        "Service is not started: binding information undefined");
+    return binding;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperServiceKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperServiceKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperServiceKeys.java
new file mode 100644
index 0000000..f4f4976
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperServiceKeys.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.server.services;
+
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+
+/**
+ * Service keys for configuring the {@link MicroZookeeperService}.
+ * These are not used in registry clients or the RM-side service,
+ * so are kept separate.
+ */
+public interface MicroZookeeperServiceKeys {
+  public static final String ZKSERVICE_PREFIX =
+      RegistryConstants.REGISTRY_PREFIX + "zk.service.";
+  /**
+   * Key to define the JAAS context for the ZK service: {@value}.
+   */
+  public static final String KEY_REGISTRY_ZKSERVICE_JAAS_CONTEXT =
+      ZKSERVICE_PREFIX + "service.jaas.context";
+
+  /**
+   * ZK servertick time: {@value}
+   */
+  public static final String KEY_ZKSERVICE_TICK_TIME =
+      ZKSERVICE_PREFIX + "ticktime";
+
+  /**
+   * host to register on: {@value}.
+   */
+  public static final String KEY_ZKSERVICE_HOST = ZKSERVICE_PREFIX + "host";
+  /**
+   * Default host to serve on -this is <code>localhost</code> as it
+   * is the only one guaranteed to be available: {@value}.
+   */
+  public static final String DEFAULT_ZKSERVICE_HOST = "localhost";
+  /**
+   * port; 0 or below means "any": {@value}
+   */
+  public static final String KEY_ZKSERVICE_PORT = ZKSERVICE_PREFIX + "port";
+
+  /**
+   * Directory containing data: {@value}
+   */
+  public static final String KEY_ZKSERVICE_DIR = ZKSERVICE_PREFIX + "dir";
+
+  /**
+   * Should failed SASL clients be allowed: {@value}?
+   *
+   * Default is the ZK default: true
+   */
+  public static final String KEY_ZKSERVICE_ALLOW_FAILED_SASL_CLIENTS =
+      ZKSERVICE_PREFIX + "allow.failed.sasl.clients";
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java
new file mode 100644
index 0000000..693bb0b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.server.services;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
+import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
+import org.apache.hadoop.registry.client.exceptions.NoPathPermissionsException;
+import org.apache.hadoop.registry.client.exceptions.NoRecordException;
+import org.apache.hadoop.registry.client.impl.zk.RegistryBindingSource;
+import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService;
+import org.apache.hadoop.registry.client.impl.zk.RegistrySecurity;
+import org.apache.hadoop.registry.client.types.RegistryPathStatus;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Administrator service for the registry. This is the one with
+ * permissions to create the base directories and those for users.
+ *
+ * It also includes support for asynchronous operations, so that
+ * zookeeper connectivity problems do not hold up the server code
+ * performing the actions.
+ *
+ * Any action queued via {@link #submit(Callable)} will be
+ * run asynchronously. The {@link #createDirAsync(String, List, boolean)}
+ * is an example of such an an action
+ *
+ * A key async action is the depth-first tree purge, which supports
+ * pluggable policies for deleting entries. The method
+ * {@link #purge(String, NodeSelector, PurgePolicy, BackgroundCallback)}
+ * implements the recursive purge operation —the class
+ * {{AsyncPurge}} provides the asynchronous scheduling of this.
+ */
+public class RegistryAdminService extends RegistryOperationsService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RegistryAdminService.class);
+  /**
+   * The ACL permissions for the user's homedir ACL.
+   */
+  public static final int USER_HOMEDIR_ACL_PERMISSIONS =
+        ZooDefs.Perms.READ | ZooDefs.Perms.WRITE
+      | ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE;
+
+  /**
+   * Executor for async operations
+   */
+  protected final ExecutorService executor;
+
+  /**
+   * Construct an instance of the service
+   * @param name service name
+   */
+  public RegistryAdminService(String name) {
+    this(name, null);
+  }
+
+  /**
+   * construct an instance of the service, using the
+   * specified binding source to bond to ZK
+   * @param name service name
+   * @param bindingSource provider of ZK binding information
+   */
+  public RegistryAdminService(String name,
+      RegistryBindingSource bindingSource) {
+    super(name, bindingSource);
+    executor = Executors.newCachedThreadPool(
+        new ThreadFactory() {
+          private AtomicInteger counter = new AtomicInteger(1);
+
+          @Override
+          public Thread newThread(Runnable r) {
+            return new Thread(r,
+                "RegistryAdminService " + counter.getAndIncrement());
+          }
+        });
+  }
+
+  /**
+   * Stop the service: halt the executor.
+   * @throws Exception exception.
+   */
+  @Override
+  protected void serviceStop() throws Exception {
+    stopExecutor();
+    super.serviceStop();
+  }
+
+  /**
+   * Stop the executor if it is not null.
+   * This uses {@link ExecutorService#shutdownNow()}
+   * and so does not block until they have completed.
+   */
+  protected synchronized void stopExecutor() {
+    if (executor != null) {
+      executor.shutdownNow();
+    }
+  }
+
+  /**
+   * Get the executor
+   * @return the executor
+   */
+  protected ExecutorService getExecutor() {
+    return executor;
+  }
+
+  /**
+   * Submit a callable
+   * @param callable callable
+   * @param <V> type of the final get
+   * @return a future to wait on
+   */
+  public <V> Future<V> submit(Callable<V> callable) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Submitting {}", callable);
+    }
+    return getExecutor().submit(callable);
+  }
+
+  /**
+   * Asynchronous operation to create a directory
+   * @param path path
+   * @param acls ACL list
+   * @param createParents flag to indicate parent dirs should be created
+   * as needed
+   * @return the future which will indicate whether or not the operation
+   * succeeded —and propagate any exceptions
+   * @throws IOException
+   */
+  public Future<Boolean> createDirAsync(final String path,
+      final List<ACL> acls,
+      final boolean createParents) throws IOException {
+    return submit(new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws Exception {
+        return maybeCreate(path, CreateMode.PERSISTENT,
+            acls, createParents);
+      }
+    });
+  }
+
+  /**
+   * Init operation sets up the system ACLs.
+   * @param conf configuration of the service
+   * @throws Exception
+   */
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    RegistrySecurity registrySecurity = getRegistrySecurity();
+    if (registrySecurity.isSecureRegistry()) {
+      ACL sasl = registrySecurity.createSaslACLFromCurrentUser(ZooDefs.Perms.ALL);
+      registrySecurity.addSystemACL(sasl);
+      LOG.info("Registry System ACLs:",
+          RegistrySecurity.aclsToString(
+          registrySecurity.getSystemACLs()));
+    }
+  }
+
+  /**
+   * Start the service, including creating base directories with permissions
+   * @throws Exception
+   */
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+    // create the root directories
+    try {
+      createRootRegistryPaths();
+    } catch (NoPathPermissionsException e) {
+
+      String message = String.format(Locale.ENGLISH,
+          "Failed to create root paths {%s};" +
+          "\ndiagnostics={%s}" +
+          "\ncurrent registry is:" +
+          "\n{%s}",
+          e,
+          bindingDiagnosticDetails(),
+          dumpRegistryRobustly(true));
+
+      LOG.error(" Failure {}", e, e);
+      LOG.error(message);
+
+      // TODO: this is something temporary to deal with the problem
+      // that jenkins is failing this test
+      throw new NoPathPermissionsException(e.getPath().toString(), message, e);
+    }
+  }
+
+  /**
+   * Create the initial registry paths
+   * @throws IOException any failure
+   */
+  @VisibleForTesting
+  public void createRootRegistryPaths() throws IOException {
+
+    List<ACL> systemACLs = getRegistrySecurity().getSystemACLs();
+    LOG.info("System ACLs {}",
+        RegistrySecurity.aclsToString(systemACLs));
+    maybeCreate("", CreateMode.PERSISTENT, systemACLs, false);
+    maybeCreate(PATH_USERS, CreateMode.PERSISTENT,
+        systemACLs, false);
+    maybeCreate(PATH_SYSTEM_SERVICES,
+        CreateMode.PERSISTENT,
+        systemACLs, false);
+  }
+
+  /**
+   * Get the path to a user's home dir
+   * @param username username
+   * @return a path for services underneath
+   */
+  protected String homeDir(String username) {
+    return RegistryUtils.homePathForUser(username);
+  }
+
+  /**
+   * Set up the ACL for the user.
+   * <b>Important: this must run client-side as it needs
+   * to know the id:pass tuple for a user</b>
+   * @param username user name
+   * @param perms permissions
+   * @return an ACL list
+   * @throws IOException ACL creation/parsing problems
+   */
+  public List<ACL> aclsForUser(String username, int perms) throws IOException {
+    List<ACL> clientACLs = getClientAcls();
+    RegistrySecurity security = getRegistrySecurity();
+    if (security.isSecureRegistry()) {
+      clientACLs.add(security.createACLfromUsername(username, perms));
+    }
+    return clientACLs;
+  }
+
+  /**
+   * Start an async operation to create the home path for a user
+   * if it does not exist
+   * @param shortname username, without any @REALM in kerberos
+   * @return the path created
+   * @throws IOException any failure while setting up the operation
+   *
+   */
+  public Future<Boolean> initUserRegistryAsync(final String shortname)
+      throws IOException {
+
+    String homeDir = homeDir(shortname);
+    if (!exists(homeDir)) {
+      // create the directory. The user does not
+      return createDirAsync(homeDir,
+          aclsForUser(shortname,
+              USER_HOMEDIR_ACL_PERMISSIONS),
+          false);
+    }
+    return null;
+  }
+
+  /**
+   * Create the home path for a user if it does not exist.
+   *
+   * This uses {@link #initUserRegistryAsync(String)} and then waits for the
+   * result ... the code path is the same as the async operation; this just
+   * picks up and relays/converts exceptions
+   * @param username username
+   * @return the path created
+   * @throws IOException any failure
+   *
+   */
+  public String initUserRegistry(final String username)
+      throws IOException {
+
+    try {
+      Future<Boolean> future = initUserRegistryAsync(username);
+      future.get();
+    } catch (InterruptedException e) {
+      throw (InterruptedIOException)
+          (new InterruptedIOException(e.toString()).initCause(e));
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof IOException) {
+        throw (IOException) (cause);
+      } else {
+        throw new IOException(cause.toString(), cause);
+      }
+    }
+
+    return homeDir(username);
+  }
+
+  /**
+   * Method to validate the validity of the kerberos realm.
+   * <ul>
+   *   <li>Insecure: not needed.</li>
+   *   <li>Secure: must have been determined.</li>
+   * </ul>
+   */
+  protected void verifyRealmValidity() throws ServiceStateException {
+    if (isSecure()) {
+      String realm = getRegistrySecurity().getKerberosRealm();
+      if (StringUtils.isEmpty(realm)) {
+        throw new ServiceStateException("Cannot determine service realm");
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Started Registry operations in realm {}", realm);
+      }
+    }
+  }
+
+  /**
+   * Policy to purge entries
+   */
+  public enum PurgePolicy {
+    PurgeAll,
+    FailOnChildren,
+    SkipOnChildren
+  }
+
+  /**
+   * Recursive operation to purge all matching records under a base path.
+   * <ol>
+   *   <li>Uses a depth first search</li>
+   *   <li>A match is on ID and persistence policy, or, if policy==-1, any match</li>
+   *   <li>If a record matches then it is deleted without any child searches</li>
+   *   <li>Deletions will be asynchronous if a callback is provided</li>
+   * </ol>
+   *
+   * The code is designed to be robust against parallel deletions taking place;
+   * in such a case it will stop attempting that part of the tree. This
+   * avoid the situation of more than 1 purge happening in parallel and
+   * one of the purge operations deleteing the node tree above the other.
+   * @param path base path
+   * @param selector selector for the purge policy
+   * @param purgePolicy what to do if there is a matching record with children
+   * @param callback optional curator callback
+   * @return the number of delete operations perfomed. As deletes may be for
+   * everything under a path, this may be less than the number of records
+   * actually deleted
+   * @throws IOException problems
+   * @throws PathIsNotEmptyDirectoryException if an entry cannot be deleted
+   * as it has children and the purge policy is FailOnChildren
+   */
+  @VisibleForTesting
+  public int purge(String path,
+      NodeSelector selector,
+      PurgePolicy purgePolicy,
+      BackgroundCallback callback) throws IOException {
+
+
+    boolean toDelete = false;
+    // look at self to see if it has a service record
+    Map<String, RegistryPathStatus> childEntries;
+    Collection<RegistryPathStatus> entries;
+    try {
+      // list this path's children
+      childEntries = RegistryUtils.statChildren(this, path);
+      entries = childEntries.values();
+    } catch (PathNotFoundException e) {
+      // there's no record here, it may have been deleted already.
+      // exit
+      return 0;
+    }
+
+    try {
+      RegistryPathStatus registryPathStatus = stat(path);
+      ServiceRecord serviceRecord = resolve(path);
+      // there is now an entry here.
+      toDelete = selector.shouldSelect(path, registryPathStatus, serviceRecord);
+    } catch (EOFException ignored) {
+      // ignore
+    } catch (InvalidRecordException ignored) {
+      // ignore
+    } catch (NoRecordException ignored) {
+      // ignore
+    } catch (PathNotFoundException e) {
+      // there's no record here, it may have been deleted already.
+      // exit
+      return 0;
+    }
+
+    if (toDelete && !entries.isEmpty()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Match on record @ {} with children ", path);
+      }
+      // there's children
+      switch (purgePolicy) {
+        case SkipOnChildren:
+          // don't do the deletion... continue to next record
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Skipping deletion");
+          }
+          toDelete = false;
+          break;
+        case PurgeAll:
+          // mark for deletion
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Scheduling for deletion with children");
+          }
+          toDelete = true;
+          entries = new ArrayList<RegistryPathStatus>(0);
+          break;
+        case FailOnChildren:
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Failing deletion operation");
+          }
+          throw new PathIsNotEmptyDirectoryException(path);
+      }
+    }
+
+    int deleteOps = 0;
+    if (toDelete) {
+      try {
+        zkDelete(path, true, callback);
+      } catch (PathNotFoundException e) {
+        // sign that the path was deleted during the operation.
+        // this is a no-op, and all children can be skipped
+        return deleteOps;
+      }
+      deleteOps++;
+    }
+
+    // now go through the children
+    for (RegistryPathStatus status : entries) {
+      String childname = status.path;
+      String childpath = RegistryPathUtils.join(path, childname);
+      deleteOps += purge(childpath,
+          selector,
+          purgePolicy,
+          callback);
+    }
+
+    return deleteOps;
+  }
+
+  /**
+   * Comparator used for purge logic
+   */
+  public interface NodeSelector {
+
+    boolean shouldSelect(String path,
+        RegistryPathStatus registryPathStatus,
+        ServiceRecord serviceRecord);
+  }
+
+  /**
+   * An async registry purge action taking
+   * a selector which decides what to delete
+   */
+  public class AsyncPurge implements Callable<Integer> {
+
+    private final BackgroundCallback callback;
+    private final NodeSelector selector;
+    private final String path;
+    private final PurgePolicy purgePolicy;
+
+    public AsyncPurge(String path,
+        NodeSelector selector,
+        PurgePolicy purgePolicy,
+        BackgroundCallback callback) {
+      this.callback = callback;
+      this.selector = selector;
+      this.path = path;
+      this.purgePolicy = purgePolicy;
+    }
+
+    @Override
+    public Integer call() throws Exception {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Executing {}", this);
+      }
+      return purge(path,
+          selector,
+          purgePolicy,
+          callback);
+    }
+
+    @Override
+    public String toString() {
+      return String.format(
+          "Record purge under %s with selector %s",
+          path, selector);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/package-info.java
new file mode 100644
index 0000000..85d24b3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/package-info.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Basic services for the YARN registry
+ * <ul>
+ *   <li>The {@link org.apache.hadoop.registry.server.services.RegistryAdminService}</ol>
+ *   extends the shared Yarn Registry client with registry setup and
+ *   (potentially asynchronous) administrative actions.
+ *   </li>
+ *   <li>
+ *     The {@link org.apache.hadoop.registry.server.services.MicroZookeeperService}
+ *     is a transient Zookeeper instance bound to the YARN service lifecycle.
+ *     It is suitable for testing.
+ *   </li>
+ *   <li>
+ *     The {@link org.apache.hadoop.registry.server.services.AddingCompositeService}
+ *     extends the standard YARN composite service by making its add and remove
+ *     methods public. It is a utility service used in parts of the codebase
+ *   </li>
+ *
+ * </ul>
+ *
+ */
+package org.apache.hadoop.registry.server.services;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/resources/.keep
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/resources/.keep b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/resources/.keep
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla
new file mode 100644
index 0000000..1c19ade
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/tla/yarnregistry.tla
@@ -0,0 +1,538 @@
+---------------------------- MODULE yarnregistry ----------------------------
+
+EXTENDS FiniteSets, Sequences, Naturals, TLC
+
+
+(*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *)
+
+(*
+
+============================================================================
+
+This defines the YARN registry in terms of operations on sets of records.
+
+Every registry entry is represented as a record containing both the path and the data.
+
+It assumes that
+
+1. operations on this set are immediate.
+2. selection operations (such as \A and \E are atomic)
+3. changes are immediately visible to all other users of the registry.
+4. This clearly implies that changes are visible in the sequence in which they happen.
+
+A multi-server Zookeeper-based registry may not meet all those assumptions
+
+1. changes may take time to propagate across the ZK quorum, hence changes cannot
+be considered immediate from the perspective of other registry clients.
+(assumptions (1) and (3)).
+
+2. Selection operations may not be atomic. (assumption (2)).
+
+Operations will still happen in the order received by the elected ZK master
+
+A stricter definition would try to state that all operations are eventually
+true excluding other changes happening during a sequence of action.
+This is left as an excercise for the reader.
+
+The specification also omits all coverage of the permissions policy.
+*)
+
+
+
+CONSTANTS
+    PathChars,     \* the set of valid characters in a path
+    Paths,         \* the set of all possible valid paths
+    Data,          \* the set of all possible sequences of bytes
+    Address,       \* the set of all possible address n-tuples
+    Addresses,     \* the set of all possible address instances
+    Endpoints ,    \* the set of all possible endpoints
+    PersistPolicies,\* the set of persistence policies
+    ServiceRecords, \* all service records
+    Registries,     \* the set of all possile registries
+    BindActions,     \* all possible put actions
+    DeleteActions,  \* all possible delete actions
+    PurgeActions,   \* all possible purge actions
+    MknodeActions    \* all possible mkdir actions
+
+
+
+(* the registry*)
+VARIABLE registry
+
+(* Sequence of actions to apply to the registry *)
+VARIABLE actions
+
+----------------------------------------------------------------------------------------
+(* Tuple of all variables.  *)
+
+
+vars == << registry, actions >>
+
+
+----------------------------------------------------------------------------------------
+
+
+
+
+(* Persistence policy *)
+PersistPolicySet == {
+    "",                      \* Undefined; field not present. PERMANENT is implied.
+    "permanent",            \* persists until explicitly removed
+    "application",          \* persists until the application finishes
+    "application-attempt",  \* persists until the application attempt finishes
+    "container"             \* persists until the container finishes
+  }
+
+(* Type invariants. *)
+TypeInvariant ==
+    /\ \A p \in PersistPolicies: p \in PersistPolicySet
+
+
+
+----------------------------------------------------------------------------------------
+
+
+
+(*
+
+An Entry is defined as a path, and the actual
+data which it contains.
+
+By including the path in an entry, we avoid having to define some
+function mapping Path -> entry.  Instead a registry can be defined as a
+set of RegistryEntries matching the validity critera.
+
+*)
+
+RegistryEntry == [
+    \* The path to the entry
+    path: Paths,
+
+    \* the data in the entry
+    data: Data
+    ]
+
+
+(*
+    An endpoint in a service record
+*)
+Endpoint == [
+    \* API of the endpoint: some identifier
+    api: STRING,
+
+    \* A list of address n-tuples
+    addresses: Addresses
+]
+
+(* Attributes are the set of all string to string mappings *)
+
+Attributes == [
+STRING |-> STRING
+]
+
+(*
+    A service record
+*)
+ServiceRecord == [
+    \* ID -used when applying the persistence policy
+    yarn_id: STRING,
+
+    \* the persistence policy
+    yarn_persistence: PersistPolicySet,
+
+    \*A description
+    description: STRING,
+
+    \* A set of endpoints
+    external: Endpoints,
+
+    \* Endpoints intended for use internally
+    internal: Endpoints,
+
+    \* Attributes are a function
+    attributes: Attributes
+]
+
+
+----------------------------------------------------------------------------------------
+
+(* Action Records *)
+
+putAction == [
+    type: "put",
+    record: ServiceRecord
+]
+
+deleteAction == [
+    type: "delete",
+    path: STRING,
+    recursive: BOOLEAN
+]
+
+purgeAction == [
+    type: "purge",
+    path: STRING,
+    persistence: PersistPolicySet
+]
+
+mkNodeAction == [
+    type: "mknode",
+    path: STRING,
+    parents: BOOLEAN
+]
+
+
+----------------------------------------------------------------------------------------
+
+(*
+
+ Path operations
+
+*)
+
+(*
+Parent is defined for non empty sequences
+ *)
+
+parent(path) == SubSeq(path, 1, Len(path)-1)
+
+isParent(path, c) == path = parent(c)
+
+----------------------------------------------------------------------------------------
+(*
+Registry Access Operations
+*)
+
+(*
+Lookup all entries in a registry with a matching path
+*)
+
+resolve(Registry, path) == \A entry \in Registry: entry.path = path
+
+(*
+A path exists in the registry iff there is an entry with that path
+*)
+
+exists(Registry, path) == resolve(Registry, path) /= {}
+
+(*
+A parent entry, or an empty set if there is none
+*)
+parentEntry(Registry, path) == resolve(Registry, parent(path))
+
+(*
+A root path is the empty sequence
+*)
+isRootPath(path) == path = <<>>
+
+(*
+The root entry is the entry whose path is the root path
+*)
+isRootEntry(entry) == entry.path = <<>>
+
+
+(*
+A path p is an ancestor of another path d if they are different, and the path d
+starts with path p
+*)
+
+isAncestorOf(path, d) ==
+    /\ path /= d
+    /\ \E k : SubSeq(d, 0, k) = path
+
+
+ancestorPathOf(path) ==
+    \A a \in Paths: isAncestorOf(a, path)
+
+(*
+The set of all children of a path in the registry
+*)
+
+children(R, path) == \A c \in R: isParent(path, c.path)
+
+(*
+A path has children if the children() function does not return the empty set
+*)
+hasChildren(R, path) == children(R, path) /= {}
+
+(*
+Descendant: a child of a path or a descendant of a child of a path
+*)
+
+descendants(R, path) == \A e \in R: isAncestorOf(path, e.path)
+
+(*
+Ancestors: all entries in the registry whose path is an entry of the path argument
+*)
+ancestors(R, path) == \A e \in R: isAncestorOf(e.path, path)
+
+(*
+The set of entries that are a path and its descendants
+*)
+pathAndDescendants(R, path) ==
+    \/ \A e \in R: isAncestorOf(path, e.path)
+    \/ resolve(R, path)
+
+
+(*
+For validity, all entries must match the following criteria
+ *)
+
+validRegistry(R) ==
+        \* there can be at most one entry for a path.
+        /\ \A e \in R: Cardinality(resolve(R, e.path)) = 1
+
+        \* There's at least one root entry
+        /\ \E e \in R: isRootEntry(e)
+
+        \* an entry must be the root entry or have a parent entry
+        /\ \A e \in R: isRootEntry(e) \/ exists(R, parent(e.path))
+
+        \* If the entry has data, it must be a service record
+        /\ \A e \in R: (e.data = << >> \/ e.data \in ServiceRecords)
+
+
+----------------------------------------------------------------------------------------
+(*
+Registry Manipulation
+*)
+
+(*
+An entry can be put into the registry iff
+its parent is present or it is the root entry
+*)
+canBind(R, e) ==
+    isRootEntry(e) \/ exists(R, parent(e.path))
+
+(*
+'bind() adds/replaces an entry if permitted
+*)
+
+bind(R, e) ==
+    /\ canBind(R, e)
+    /\ R' = (R \ resolve(R, e.path)) \union {e}
+
+
+(*
+mknode() adds a new empty entry where there was none before, iff
+-the parent exists
+-it meets the requirement for being "bindable"
+*)
+
+mknodeSimple(R, path) ==
+    LET record == [ path |-> path, data |-> <<>>  ]
+    IN  \/ exists(R, path)
+        \/ (exists(R, parent(path))  /\ canBind(R, record) /\ (R' = R \union {record} ))
+
+
+(*
+For all parents, the mknodeSimpl() criteria must apply.
+This could be defined recursively, though as TLA+ does not support recursion,
+an alternative is required
+
+
+Because this specification is declaring the final state of a operation, not
+the implemental, all that is needed is to describe those parents.
+
+It declares that the mkdirSimple state applies to the path and all its parents in the set R'
+
+*)
+mknodeWithParents(R, path) ==
+    /\ \A p2 \in ancestors(R, path) : mknodeSimple(R, p2)
+    /\ mknodeSimple(R, path)
+
+
+mknode(R, path, recursive) ==
+   IF recursive THEN mknodeWithParents(R, path) ELSE mknodeSimple(R, path)
+
+(*
+Deletion is set difference on any existing entries
+*)
+
+simpleDelete(R, path) ==
+    /\ ~isRootPath(path)
+    /\ children(R, path) = {}
+    /\ R' = R \ resolve(R, path)
+
+(*
+Recursive delete: neither the path or its descendants exists in the new registry
+*)
+
+recursiveDelete(R, path) ==
+       \* Root path: the new registry is the initial registry again
+    /\ isRootPath(path) => R' = { [ path |-> <<>>, data |-> <<>> ] }
+       \*  Any other entry: the new registry is a set with any existing
+       \* entry for that path is removed, and the new entry added
+    /\ ~isRootPath(path) => R' = R \ ( resolve(R, path) \union descendants(R, path))
+
+
+(*
+Delete operation which chooses the recursiveness policy based on an argument
+*)
+
+delete(R, path, recursive) ==
+    IF recursive THEN recursiveDelete(R, path) ELSE simpleDelete(R, path)
+
+
+(*
+Purge ensures that all entries under a path with the matching ID and policy are not there
+afterwards
+*)
+
+purge(R, path, id, persistence) ==
+    /\ (persistence \in PersistPolicySet)
+    /\ \A p2 \in pathAndDescendants(R, path) :
+         (p2.attributes["yarn:id"] = id /\ p2.attributes["yarn:persistence"] = persistence)
+         => recursiveDelete(R, p2.path)
+
+(*
+resolveRecord() resolves the record at a path or fails.
+
+It relies on the fact that if the cardinality of a set is 1, then the CHOOSE operator
+is guaranteed to return the single entry of that set, iff the choice predicate holds.
+
+Using a predicate of TRUE, it always succeeds, so this function selects
+the sole entry of the resolve operation.
+*)
+
+resolveRecord(R, path) ==
+    LET l == resolve(R, path) IN
+        /\ Cardinality(l) = 1
+        /\ CHOOSE e \in l : TRUE
+
+(*
+The specific action of putting an entry into a record includes validating the record
+*)
+
+validRecordToBind(path, record) ==
+      \* The root entry must have permanent persistence
+     isRootPath(path) => (record.attributes["yarn:persistence"] = "permanent"
+     \/ record.attributes["yarn:persistence"] = "")
+
+
+(*
+Binding a service record involves validating it then putting it in the registry
+marshalled as the data in the entry
+ *)
+bindRecord(R, path, record) ==
+    /\ validRecordToBind(path, record)
+    /\ bind(R, [path |-> path, data |-> record])
+
+
+----------------------------------------------------------------------------------------
+
+
+
+(*
+The action queue can only contain one of the sets of action types, and
+by giving each a unique name, those sets are guaranteed to be disjoint
+*)
+ QueueInvariant ==
+    /\ \A a \in actions:
+        \/ (a \in BindActions /\ a.type="bind")
+        \/ (a \in DeleteActions /\ a.type="delete")
+        \/ (a \in PurgeActions /\ a.type="purge")
+        \/ (a \in MknodeActions /\ a.type="mknode")
+
+
+(*
+Applying queued actions
+*)
+
+applyAction(R, a) ==
+    \/ (a \in BindActions /\ bindRecord(R, a.path, a.record) )
+    \/ (a \in MknodeActions /\ mknode(R, a.path, a.recursive) )
+    \/ (a \in DeleteActions /\ delete(R, a.path, a.recursive) )
+    \/ (a \in PurgeActions /\ purge(R, a.path, a.id, a.persistence))
+
+
+(*
+Apply the first action in a list and then update the actions
+*)
+applyFirstAction(R, a) ==
+    /\ actions /= <<>>
+    /\ applyAction(R, Head(a))
+    /\ actions' = Tail(a)
+
+
+Next == applyFirstAction(registry, actions)
+
+(*
+All submitted actions must eventually be applied.
+*)
+
+
+Liveness == <>( actions = <<>> )
+
+
+(*
+The initial state of a registry has the root entry.
+*)
+
+InitialRegistry == registry = {
+  [ path |-> <<>>, data |-> <<>> ]
+}
+
+
+(*
+The valid state of the "registry" variable is defined as
+Via the validRegistry predicate
+*)
+
+ValidRegistryState == validRegistry(registry)
+
+
+
+(*
+The initial state of the system
+*)
+InitialState ==
+    /\ InitialRegistry
+    /\ ValidRegistryState
+    /\ actions = <<>>
+
+
+(*
+The registry has an initial state, the series of state changes driven by the actions,
+and the requirement that it does act on those actions.
+*)
+RegistrySpec ==
+    /\ InitialState
+    /\ [][Next]_vars
+    /\ Liveness
+
+
+----------------------------------------------------------------------------------------
+
+(*
+Theorem: For all operations from that initial state, the registry state is still valid
+*)
+THEOREM InitialState => [] ValidRegistryState
+
+(*
+Theorem: for all operations from that initial state, the type invariants hold
+*)
+THEOREM InitialState => [] TypeInvariant
+
+(*
+Theorem: the queue invariants hold
+*)
+THEOREM InitialState => [] QueueInvariant
+
+=============================================================================

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractRegistryTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractRegistryTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractRegistryTest.java
new file mode 100644
index 0000000..5b34f60
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractRegistryTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry;
+
+import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.server.integration.RMRegistryOperationsService;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+/**
+ * Abstract registry tests .. inits the field {@link #registry}
+ * before the test with an instance of {@link RMRegistryOperationsService};
+ * and {@link #operations} with the same instance cast purely
+ * to the type {@link RegistryOperations}.
+ *
+ */
+public class AbstractRegistryTest extends AbstractZKRegistryTest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AbstractRegistryTest.class);
+  protected RMRegistryOperationsService registry;
+  protected RegistryOperations operations;
+
+  @Before
+  public void setupRegistry() throws IOException {
+    registry = new RMRegistryOperationsService("yarnRegistry");
+    operations = registry;
+    registry.init(createRegistryConfiguration());
+    registry.start();
+    operations.delete("/", true);
+    registry.createRootRegistryPaths();
+    addToTeardown(registry);
+  }
+
+  /**
+   * Create a service entry with the sample endpoints, and put it
+   * at the destination
+   * @param path path
+   * @param createFlags flags
+   * @return the record
+   * @throws IOException on a failure
+   */
+  protected ServiceRecord putExampleServiceEntry(String path, int createFlags) throws
+      IOException,
+      URISyntaxException {
+    return putExampleServiceEntry(path, createFlags, PersistencePolicies.PERMANENT);
+  }
+
+  /**
+   * Create a service entry with the sample endpoints, and put it
+   * at the destination
+   * @param path path
+   * @param createFlags flags
+   * @return the record
+   * @throws IOException on a failure
+   */
+  protected ServiceRecord putExampleServiceEntry(String path,
+      int createFlags,
+      String persistence)
+      throws IOException, URISyntaxException {
+    ServiceRecord record = buildExampleServiceEntry(persistence);
+
+    registry.mknode(RegistryPathUtils.parentOf(path), true);
+    operations.bind(path, record, createFlags);
+    return record;
+  }
+
+  /**
+   * Assert a path exists
+   * @param path path in the registry
+   * @throws IOException
+   */
+  public void assertPathExists(String path) throws IOException {
+    operations.stat(path);
+  }
+
+  /**
+   * assert that a path does not exist
+   * @param path path in the registry
+   * @throws IOException
+   */
+  public void assertPathNotFound(String path) throws IOException {
+    try {
+      operations.stat(path);
+      fail("Path unexpectedly found: " + path);
+    } catch (PathNotFoundException e) {
+
+    }
+  }
+
+  /**
+   * Assert that a path resolves to a service record
+   * @param path path in the registry
+   * @throws IOException
+   */
+  public void assertResolves(String path) throws IOException {
+    operations.resolve(path);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractZKRegistryTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractZKRegistryTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractZKRegistryTest.java
new file mode 100644
index 0000000..bcff622
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractZKRegistryTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+import org.apache.hadoop.registry.server.services.AddingCompositeService;
+import org.apache.hadoop.registry.server.services.MicroZookeeperService;
+import org.apache.hadoop.registry.server.services.MicroZookeeperServiceKeys;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+public class AbstractZKRegistryTest extends RegistryTestHelper {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AbstractZKRegistryTest.class);
+
+  private static final AddingCompositeService servicesToTeardown =
+      new AddingCompositeService("teardown");
+  // static initializer guarantees it is always started
+  // ahead of any @BeforeClass methods
+  static {
+    servicesToTeardown.init(new Configuration());
+    servicesToTeardown.start();
+  }
+
+  @Rule
+  public final Timeout testTimeout = new Timeout(10000);
+
+  @Rule
+  public TestName methodName = new TestName();
+
+  protected static void addToTeardown(Service svc) {
+    servicesToTeardown.addService(svc);
+  }
+
+  @AfterClass
+  public static void teardownServices() throws IOException {
+    describe(LOG, "teardown of static services");
+    servicesToTeardown.close();
+  }
+
+  protected static MicroZookeeperService zookeeper;
+
+
+  @BeforeClass
+  public static void createZKServer() throws Exception {
+    File zkDir = new File("target/zookeeper");
+    FileUtils.deleteDirectory(zkDir);
+    assertTrue(zkDir.mkdirs());
+    zookeeper = new MicroZookeeperService("InMemoryZKService");
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MicroZookeeperServiceKeys.KEY_ZKSERVICE_DIR, zkDir.getAbsolutePath());
+    zookeeper.init(conf);
+    zookeeper.start();
+    addToTeardown(zookeeper);
+  }
+
+  /**
+   * give our thread a name
+   */
+  @Before
+  public void nameThread() {
+    Thread.currentThread().setName("JUnit");
+  }
+
+  /**
+   * Returns the connection string to use
+   *
+   * @return connection string
+   */
+  public String getConnectString() {
+    return zookeeper.getConnectionString();
+  }
+
+  public YarnConfiguration createRegistryConfiguration() {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(RegistryConstants.KEY_REGISTRY_ZK_CONNECTION_TIMEOUT, 1000);
+    conf.setInt(RegistryConstants.KEY_REGISTRY_ZK_RETRY_INTERVAL, 500);
+    conf.setInt(RegistryConstants.KEY_REGISTRY_ZK_RETRY_TIMES, 10);
+    conf.setInt(RegistryConstants.KEY_REGISTRY_ZK_RETRY_CEILING, 10);
+    conf.set(RegistryConstants.KEY_REGISTRY_ZK_QUORUM,
+        zookeeper.getConnectionString());
+    return conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java
new file mode 100644
index 0000000..38cc2cb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/RegistryTestHelper.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.types.AddressTypes;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ProtocolTypes;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.apache.hadoop.registry.secure.AbstractSecureRegistryTest;
+import org.apache.zookeeper.common.PathUtils;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.inetAddrEndpoint;
+import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.ipcEndpoint;
+import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.restEndpoint;
+import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.tuple;
+import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.webEndpoint;
+
+/**
+ * This is a set of static methods to aid testing the registry operations.
+ * The methods can be imported statically —or the class used as a base
+ * class for tests.
+ */
+public class RegistryTestHelper extends Assert {
+  public static final String SC_HADOOP = "org-apache-hadoop";
+  public static final String USER = "devteam/";
+  public static final String NAME = "hdfs";
+  public static final String API_WEBHDFS = "org_apache_hadoop_namenode_webhdfs";
+  public static final String API_HDFS = "org_apache_hadoop_namenode_dfs";
+  public static final String USERPATH = RegistryConstants.PATH_USERS + USER;
+  public static final String PARENT_PATH = USERPATH + SC_HADOOP + "/";
+  public static final String ENTRY_PATH = PARENT_PATH + NAME;
+  public static final String NNIPC = "nnipc";
+  public static final String IPC2 = "IPC2";
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RegistryTestHelper.class);
+  public static final String KTUTIL = "ktutil";
+  private static final RegistryUtils.ServiceRecordMarshal recordMarshal =
+      new RegistryUtils.ServiceRecordMarshal();
+
+  /**
+   * Assert the path is valid by ZK rules
+   * @param path path to check
+   */
+  public static void assertValidZKPath(String path) {
+    try {
+      PathUtils.validatePath(path);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException("Invalid Path " + path + ": " + e, e);
+    }
+  }
+
+  /**
+   * Assert that a string is not empty (null or "")
+   * @param message message to raise if the string is empty
+   * @param check string to check
+   */
+  public static void assertNotEmpty(String message, String check) {
+    if (StringUtils.isEmpty(check)) {
+      fail(message);
+    }
+  }
+
+  /**
+   * Assert that a string is empty (null or "")
+   * @param check string to check
+   */
+  public static void assertNotEmpty(String check) {
+    if (StringUtils.isEmpty(check)) {
+      fail("Empty string");
+    }
+  }
+
+  /**
+   * Log the details of a login context
+   * @param name name to assert that the user is logged in as
+   * @param loginContext the login context
+   */
+  public static void logLoginDetails(String name,
+      LoginContext loginContext) {
+    assertNotNull("Null login context", loginContext);
+    Subject subject = loginContext.getSubject();
+    LOG.info("Logged in as {}:\n {}", name, subject);
+  }
+
+  /**
+   * Set the JVM property to enable Kerberos debugging
+   */
+  public static void enableKerberosDebugging() {
+    System.setProperty(AbstractSecureRegistryTest.SUN_SECURITY_KRB5_DEBUG,
+        "true");
+  }
+  /**
+   * Set the JVM property to enable Kerberos debugging
+   */
+  public static void disableKerberosDebugging() {
+    System.setProperty(AbstractSecureRegistryTest.SUN_SECURITY_KRB5_DEBUG,
+        "false");
+  }
+
+  /**
+   * General code to validate bits of a component/service entry built iwth
+   * {@link #addSampleEndpoints(ServiceRecord, String)}
+   * @param record instance to check
+   */
+  public static void validateEntry(ServiceRecord record) {
+    assertNotNull("null service record", record);
+    List<Endpoint> endpoints = record.external;
+    assertEquals(2, endpoints.size());
+
+    Endpoint webhdfs = findEndpoint(record, API_WEBHDFS, true, 1, 1);
+    assertEquals(API_WEBHDFS, webhdfs.api);
+    assertEquals(AddressTypes.ADDRESS_URI, webhdfs.addressType);
+    assertEquals(ProtocolTypes.PROTOCOL_REST, webhdfs.protocolType);
+    List<List<String>> addressList = webhdfs.addresses;
+    List<String> url = addressList.get(0);
+    String addr = url.get(0);
+    assertTrue(addr.contains("http"));
+    assertTrue(addr.contains(":8020"));
+
+    Endpoint nnipc = findEndpoint(record, NNIPC, false, 1,2);
+    assertEquals("wrong protocol in " + nnipc, ProtocolTypes.PROTOCOL_THRIFT,
+        nnipc.protocolType);
+
+    Endpoint ipc2 = findEndpoint(record, IPC2, false, 1,2);
+
+    Endpoint web = findEndpoint(record, "web", true, 1, 1);
+    assertEquals(1, web.addresses.size());
+    assertEquals(1, web.addresses.get(0).size());
+  }
+
+  /**
+   * Assert that an endpoint matches the criteria
+   * @param endpoint endpoint to examine
+   * @param addressType expected address type
+   * @param protocolType expected protocol type
+   * @param api API
+   */
+  public static void assertMatches(Endpoint endpoint,
+      String addressType,
+      String protocolType,
+      String api) {
+    assertNotNull(endpoint);
+    assertEquals(addressType, endpoint.addressType);
+    assertEquals(protocolType, endpoint.protocolType);
+    assertEquals(api, endpoint.api);
+  }
+
+  /**
+   * Assert the records match.
+   * @param source record that was written
+   * @param resolved the one that resolved.
+   */
+  public static void assertMatches(ServiceRecord source, ServiceRecord resolved) {
+    assertNotNull("Null source record ", source);
+    assertNotNull("Null resolved record ", resolved);
+    assertEquals(source.description, resolved.description);
+
+    Map<String, String> srcAttrs = source.attributes();
+    Map<String, String> resolvedAttrs = resolved.attributes();
+    String sourceAsString = source.toString();
+    String resolvedAsString = resolved.toString();
+    assertEquals("Wrong count of attrs in \n" + sourceAsString
+                 + "\nfrom\n" + resolvedAsString,
+        srcAttrs.size(),
+        resolvedAttrs.size());
+    for (Map.Entry<String, String> entry : srcAttrs.entrySet()) {
+      String attr = entry.getKey();
+      assertEquals("attribute "+ attr, entry.getValue(), resolved.get(attr));
+    }
+    assertEquals("wrong external endpoint count",
+        source.external.size(), resolved.external.size());
+    assertEquals("wrong external endpoint count",
+        source.internal.size(), resolved.internal.size());
+  }
+
+  /**
+   * Find an endpoint in a record or fail,
+   * @param record record
+   * @param api API
+   * @param external external?
+   * @param addressElements expected # of address elements?
+   * @param addressTupleSize expected size of a type
+   * @return the endpoint.
+   */
+  public static Endpoint findEndpoint(ServiceRecord record,
+      String api, boolean external, int addressElements, int addressTupleSize) {
+    Endpoint epr = external ? record.getExternalEndpoint(api)
+                            : record.getInternalEndpoint(api);
+    if (epr != null) {
+      assertEquals("wrong # of addresses",
+          addressElements, epr.addresses.size());
+      assertEquals("wrong # of elements in an address tuple",
+          addressTupleSize, epr.addresses.get(0).size());
+      return epr;
+    }
+    List<Endpoint> endpoints = external ? record.external : record.internal;
+    StringBuilder builder = new StringBuilder();
+    for (Endpoint endpoint : endpoints) {
+      builder.append("\"").append(endpoint).append("\" ");
+    }
+    fail("Did not find " + api + " in endpoints " + builder);
+    // never reached; here to keep the compiler happy
+    return null;
+  }
+
+  /**
+   * Log a record
+   * @param name record name
+   * @param record details
+   * @throws IOException only if something bizarre goes wrong marshalling
+   * a record.
+   */
+  public static void logRecord(String name, ServiceRecord record) throws
+      IOException {
+    LOG.info(" {} = \n{}\n", name, recordMarshal.toJson(record));
+  }
+
+  /**
+   * Create a service entry with the sample endpoints
+   * @param persistence persistence policy
+   * @return the record
+   * @throws IOException on a failure
+   */
+  public static ServiceRecord buildExampleServiceEntry(String persistence) throws
+      IOException,
+      URISyntaxException {
+    ServiceRecord record = new ServiceRecord();
+    record.set(YarnRegistryAttributes.YARN_ID, "example-0001");
+    record.set(YarnRegistryAttributes.YARN_PERSISTENCE, persistence);
+    addSampleEndpoints(record, "namenode");
+    return record;
+  }
+
+  /**
+   * Add some endpoints
+   * @param entry entry
+   */
+  public static void addSampleEndpoints(ServiceRecord entry, String hostname)
+      throws URISyntaxException {
+    assertNotNull(hostname);
+    entry.addExternalEndpoint(webEndpoint("web",
+        new URI("http", hostname + ":80", "/")));
+    entry.addExternalEndpoint(
+        restEndpoint(API_WEBHDFS,
+            new URI("http", hostname + ":8020", "/")));
+
+    Endpoint endpoint = ipcEndpoint(API_HDFS, true, null);
+    endpoint.addresses.add(tuple(hostname, "8030"));
+    entry.addInternalEndpoint(endpoint);
+    InetSocketAddress localhost = new InetSocketAddress("localhost", 8050);
+    entry.addInternalEndpoint(
+        inetAddrEndpoint(NNIPC, ProtocolTypes.PROTOCOL_THRIFT, "localhost",
+            8050));
+    entry.addInternalEndpoint(
+        RegistryTypeUtils.ipcEndpoint(
+            IPC2,
+            true,
+            RegistryTypeUtils.marshall(localhost)));
+  }
+
+  /**
+   * Describe the stage in the process with a box around it -so as
+   * to highlight it in test logs
+   * @param log log to use
+   * @param text text
+   * @param args logger args
+   */
+  public static void describe(Logger log, String text, Object...args) {
+    log.info("\n=======================================");
+    log.info(text, args);
+    log.info("=======================================\n");
+  }
+
+  /**
+   * log out from a context if non-null ... exceptions are caught and logged
+   * @param login login context
+   * @return null, always
+   */
+  public static LoginContext logout(LoginContext login) {
+    try {
+      if (login != null) {
+        LOG.debug("Logging out login context {}", login.toString());
+        login.logout();
+      }
+    } catch (LoginException e) {
+      LOG.warn("Exception logging out: {}", e, e);
+    }
+    return null;
+  }
+
+  /**
+   * Exec the native <code>ktutil</code> to list the keys
+   * (primarily to verify that the generated keytabs are compatible).
+   * This operation is not executed on windows. On other platforms
+   * it requires <code>ktutil</code> to be installed and on the path
+   * <pre>
+   *   ktutil --keytab=target/kdc/zookeeper.keytab list --keys
+   * </pre>
+   * @param keytab keytab to list
+   * @throws IOException on any execution problem, including the executable
+   * being missing
+   */
+  public static String ktList(File keytab) throws IOException {
+    if (!Shell.WINDOWS) {
+      String path = keytab.getAbsolutePath();
+      String out = Shell.execCommand(
+          KTUTIL,
+          "--keytab=" + path,
+          "list",
+          "--keys"
+      );
+      LOG.info("Listing of keytab {}:\n{}\n", path, out);
+      return out;
+    }
+    return "";
+  }
+
+  /**
+   * Perform a robust <code>ktutils -l</code> ... catches and ignores
+   * exceptions, otherwise the output is logged.
+   * @param keytab keytab to list
+   * @return the result of the operation, or "" on any problem
+   */
+  public static String ktListRobust(File keytab) {
+    try {
+      return ktList(keytab);
+    } catch (IOException e) {
+      // probably not on the path
+      return "";
+    }
+  }
+
+  /**
+   * Login via a UGI. Requres UGI to have been set up
+   * @param user username
+   * @param keytab keytab to list
+   * @return the UGI
+   * @throws IOException
+   */
+  public static UserGroupInformation loginUGI(String user, File keytab) throws
+      IOException {
+    LOG.info("Logging in as {} from {}", user, keytab);
+    return UserGroupInformation.loginUserFromKeytabAndReturnUGI(user,
+        keytab.getAbsolutePath());
+  }
+
+  public static ServiceRecord createRecord(String persistence) {
+    return createRecord("01", persistence, "description");
+  }
+
+  public static ServiceRecord createRecord(String id, String persistence,
+      String description) {
+    ServiceRecord serviceRecord = new ServiceRecord();
+    serviceRecord.set(YarnRegistryAttributes.YARN_ID, id);
+    serviceRecord.description = description;
+    serviceRecord.set(YarnRegistryAttributes.YARN_PERSISTENCE, persistence);
+    return serviceRecord;
+  }
+
+  public static ServiceRecord createRecord(String id, String persistence,
+      String description, String data) {
+    return createRecord(id, persistence, description);
+  }
+}


Mime
View raw message