hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jia...@apache.org
Subject [62/74] [abbrv] hadoop git commit: YARN-7091. Rename application to service in yarn-native-services. Contributed by Jian He
Date Tue, 29 Aug 2017 22:21:11 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07469f9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java
new file mode 100644
index 0000000..add2475
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.registry;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.registry.client.api.BindFlags;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
+import org.apache.hadoop.yarn.service.utils.SliderUtils;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.hadoop.registry.client.binding.RegistryPathUtils.join;
+
+/**
+ * Registry view for providers. This tracks where the service
+ * is registered, offers access to the record and other things.
+ */
+public class YarnRegistryViewForProviders {
+  private static final Log LOG =
+      LogFactory.getLog(YarnRegistryViewForProviders.class);
+
+  private final RegistryOperations registryOperations;
+  private final String user;
+  private final String sliderServiceClass;
+  private final String instanceName;
+  /**
+   * Record used where the service registered itself.
+   * Null until the service is registered
+   */
+  private ServiceRecord selfRegistration;
+
+  /**
+   * Path where record was registered
+   * Null until the service is registered
+   */
+  private String selfRegistrationPath;
+
+  public YarnRegistryViewForProviders(RegistryOperations registryOperations,
+      String user,
+      String sliderServiceClass,
+      String instanceName,
+      ApplicationAttemptId applicationAttemptId) {
+    Preconditions.checkArgument(registryOperations != null,
+        "null registry operations");
+    Preconditions.checkArgument(user != null, "null user");
+    Preconditions.checkArgument(SliderUtils.isSet(sliderServiceClass),
+        "unset service class");
+    Preconditions.checkArgument(SliderUtils.isSet(instanceName),
+        "instanceName");
+    Preconditions.checkArgument(applicationAttemptId != null,
+        "null applicationAttemptId");
+    this.registryOperations = registryOperations;
+    this.user = user;
+    this.sliderServiceClass = sliderServiceClass;
+    this.instanceName = instanceName;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+
+  private void setSelfRegistration(ServiceRecord selfRegistration) {
+    this.selfRegistration = selfRegistration;
+  }
+
+  /**
+   * Get the path to where the service has registered itself.
+   * Null until the service is registered
+   * @return the service registration path.
+   */
+  public String getSelfRegistrationPath() {
+    return selfRegistrationPath;
+  }
+
+  /**
+   * Get the absolute path to where the service has registered itself.
+   * This includes the base registry path
+   * Null until the service is registered
+   * @return the service registration path.
+   */
+  public String getAbsoluteSelfRegistrationPath() {
+    if (selfRegistrationPath == null) {
+      return null;
+    }
+    String root = registryOperations.getConfig().getTrimmed(
+        RegistryConstants.KEY_REGISTRY_ZK_ROOT,
+        RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT);
+    return RegistryPathUtils.join(root, selfRegistrationPath);
+  }
+
+  /**
+   * Add a component under the slider name/entry
+   * @param componentName component name
+   * @param record record to put
+   * @throws IOException
+   */
+  public void putComponent(String componentName,
+      ServiceRecord record) throws
+      IOException {
+    putComponent(sliderServiceClass, instanceName,
+        componentName,
+        record);
+  }
+
+  /**
+   * Add a component 
+   * @param serviceClass service class to use under ~user
+   * @param componentName component name
+   * @param record record to put
+   * @throws IOException
+   */
+  public void putComponent(String serviceClass,
+      String serviceName,
+      String componentName,
+      ServiceRecord record) throws IOException {
+    String path = RegistryUtils.componentPath(
+        user, serviceClass, serviceName, componentName);
+    registryOperations.mknode(RegistryPathUtils.parentOf(path), true);
+    registryOperations.bind(path, record, BindFlags.OVERWRITE);
+  }
+    
+  /**
+   * Add a service under a path, optionally purging any history
+   * @param username user
+   * @param serviceClass service class to use under ~user
+   * @param serviceName name of the service
+   * @param record service record
+   * @param deleteTreeFirst perform recursive delete of the path first.
+   * @return the path the service was created at
+   * @throws IOException
+   */
+  public String putService(String username,
+      String serviceClass,
+      String serviceName,
+      ServiceRecord record,
+      boolean deleteTreeFirst) throws IOException {
+    String path = RegistryUtils.servicePath(
+        username, serviceClass, serviceName);
+    if (deleteTreeFirst) {
+      registryOperations.delete(path, true);
+    }
+    registryOperations.mknode(RegistryPathUtils.parentOf(path), true);
+    registryOperations.bind(path, record, BindFlags.OVERWRITE);
+    return path;
+  }
+
+  /**
+   * Add a service under a path for the current user
+   * @param record service record
+   * @param deleteTreeFirst perform recursive delete of the path first
+   * @return the path the service was created at
+   * @throws IOException
+   */
+  public String registerSelf(
+      ServiceRecord record,
+      boolean deleteTreeFirst) throws IOException {
+    selfRegistrationPath =
+        putService(user, sliderServiceClass, instanceName, record, deleteTreeFirst);
+    setSelfRegistration(record);
+    return selfRegistrationPath;
+  }
+
+  /**
+   * Delete a component
+   * @param containerId component name
+   * @throws IOException
+   */
+  public void deleteComponent(ComponentInstanceId instanceId,
+      String containerId) throws IOException {
+    String path = RegistryUtils.componentPath(
+        user, sliderServiceClass, instanceName,
+        containerId);
+    LOG.info(instanceId + ": Deleting registry path " + path);
+    registryOperations.delete(path, false);
+  }
+
+  /**
+   * Delete the children of a path -but not the path itself.
+   * It is not an error if the path does not exist
+   * @param path path to delete
+   * @param recursive flag to request recursive deletes
+   * @throws IOException IO problems
+   */
+  public void deleteChildren(String path, boolean recursive) throws IOException {
+    List<String> childNames = null;
+    try {
+      childNames = registryOperations.list(path);
+    } catch (PathNotFoundException e) {
+      return;
+    }
+    for (String childName : childNames) {
+      String child = join(path, childName);
+      registryOperations.delete(child, recursive);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07469f9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceMetricsSink.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceMetricsSink.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceMetricsSink.java
new file mode 100644
index 0000000..cf4e836
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceMetricsSink.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.timelineservice;
+
+import org.apache.commons.configuration2.SubsetConfiguration;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsSink;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Write the metrics to a ATSv2. Generally, this class is instantiated via
+ * hadoop-metrics2 property files. Specifically, you would create this class by
+ * adding the following to by This would actually be set as: <code>
+ * [prefix].sink.[some instance name].class
+ * =org.apache.hadoop.yarn.service.timelineservice.ServiceMetricsSink
+ * </code>, where <tt>prefix</tt> is "atsv2": and <tt>some instance name</tt> is
+ * just any unique name, so properties can be differentiated if there are
+ * multiple sinks of the same type created
+ */
+public class ServiceMetricsSink implements MetricsSink {
+
+  private static final Logger log =
+      LoggerFactory.getLogger(ServiceMetricsSink.class);
+
+  private ServiceTimelinePublisher serviceTimelinePublisher;
+
+  public ServiceMetricsSink() {
+
+  }
+
+  public ServiceMetricsSink(ServiceTimelinePublisher publisher) {
+    serviceTimelinePublisher = publisher;
+  }
+
+  /**
+   * Publishes service and component metrics to ATS.
+   */
+  @Override
+  public void putMetrics(MetricsRecord record) {
+    if (serviceTimelinePublisher.isStopped()) {
+      log.warn("ServiceTimelinePublisher has stopped. "
+          + "Not publishing any more metrics to ATS.");
+      return;
+    }
+
+    boolean isServiceMetrics = false;
+    boolean isComponentMetrics = false;
+    String appId = null;
+    for (MetricsTag tag : record.tags()) {
+      if (tag.name().equals("type") && tag.value().equals("service")) {
+        isServiceMetrics = true;
+      } else if (tag.name().equals("type") && tag.value().equals("component")) {
+        isComponentMetrics = true;
+        break; // if component metrics, no more information required from tag so
+               // break the loop
+      } else if (tag.name().equals("appId")) {
+        appId = tag.value();
+      }
+    }
+
+    if (isServiceMetrics && appId != null) {
+      if (log.isDebugEnabled()) {
+        log.debug("Publishing service metrics. " + record);
+      }
+      serviceTimelinePublisher.publishMetrics(record.metrics(), appId,
+          ServiceTimelineEntityType.SERVICE_ATTEMPT.toString(),
+          record.timestamp());
+    } else if (isComponentMetrics) {
+      if (log.isDebugEnabled()) {
+        log.debug("Publishing Component metrics. " + record);
+      }
+      serviceTimelinePublisher.publishMetrics(record.metrics(), record.name(),
+          ServiceTimelineEntityType.COMPONENT.toString(), record.timestamp());
+    }
+  }
+
+  @Override
+  public void init(SubsetConfiguration conf) {
+  }
+
+  @Override
+  public void flush() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07469f9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEntityType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEntityType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEntityType.java
new file mode 100644
index 0000000..d5c9539
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEntityType.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.timelineservice;
+
+/**
+ * Slider entities that are published to ATS.
+ */
+public enum ServiceTimelineEntityType {
+  /**
+   * Used for publishing service entity information.
+   */
+  SERVICE_ATTEMPT,
+
+  /**
+   * Used for publishing component entity information.
+   */
+  COMPONENT,
+
+  /**
+   * Used for publishing component instance entity information.
+   */
+  COMPONENT_INSTANCE
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07469f9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java
new file mode 100644
index 0000000..7f7f9a1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.timelineservice;
+
+/**
+ * Events that are used to store in ATS.
+ */
+public enum ServiceTimelineEvent {
+  SERVICE_ATTEMPT_REGISTERED,
+
+  SERVICE_ATTEMPT_UNREGISTERED,
+
+  COMPONENT_INSTANCE_REGISTERED,
+
+  COMPONENT_INSTANCE_UNREGISTERED,
+
+  COMPONENT_INSTANCE_UPDATED
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07469f9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java
new file mode 100644
index 0000000..78a7171
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelineMetricsConstants.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.timelineservice;
+
+/**
+ * Constants which are stored as key in ATS
+ */
+public final class ServiceTimelineMetricsConstants {
+
+  public static final String URI = "URI";
+
+  public static final String NAME = "NAME";
+
+  public static final String STATE = "STATE";
+
+  public static final String EXIT_STATUS_CODE = "EXIT_STATUS_CODE";
+
+  public static final String EXIT_REASON = "EXIT_REASON";
+
+  public static final String DIAGNOSTICS_INFO = "DIAGNOSTICS_INFO";
+
+  public static final String LAUNCH_TIME = "LAUNCH_TIME";
+
+  public static final String QUICK_LINKS = "QUICK_LINKS";
+
+  public static final String LAUNCH_COMMAND = "LAUNCH_COMMAND";
+
+  public static final String TOTAL_CONTAINERS = "NUMBER_OF_CONTAINERS";
+
+  public static final String RUNNING_CONTAINERS =
+      "NUMBER_OF_RUNNING_CONTAINERS";
+
+  /**
+   * Artifacts constants.
+   */
+  public static final String ARTIFACT_ID = "ARTIFACT_ID";
+
+  public static final String ARTIFACT_TYPE = "ARTIFACT_TYPE";
+
+  public static final String ARTIFACT_URI = "ARTIFACT_URI";
+
+  /**
+   * Resource constants.
+   */
+  public static final String RESOURCE_CPU = "RESOURCE_CPU";
+
+  public static final String RESOURCE_MEMORY = "RESOURCE_MEMORY";
+
+  public static final String RESOURCE_PROFILE = "RESOURCE_PROFILE";
+
+  /**
+   * component instance constants.
+   */
+  public static final String IP = "IP";
+
+  public static final String HOSTNAME = "HOSTNAME";
+
+  public static final String BARE_HOST = "BARE_HOST";
+
+  public static final String COMPONENT_NAME = "COMPONENT_NAME";
+
+  public static final String COMPONENT_INSTANCE_NAME = "COMPONENT_INSTANCE_NAME";
+
+  /**
+   * component constants.
+   */
+  public static final String DEPENDENCIES = "DEPENDENCIES";
+
+  public static final String DESCRIPTION = "DESCRIPTION";
+
+  public static final String RUN_PRIVILEGED_CONTAINER =
+      "RUN_PRIVILEGED_CONTAINER";
+
+  public static final String PLACEMENT_POLICY = "PLACEMENT_POLICY";
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07469f9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java
new file mode 100644
index 0000000..5e65ad9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/ServiceTimelinePublisher.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.timelineservice;
+
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.client.api.TimelineV2Client;
+import org.apache.hadoop.yarn.service.ServiceContext;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.ConfigFile;
+import org.apache.hadoop.yarn.service.api.records.Configuration;
+import org.apache.hadoop.yarn.service.api.records.Container;
+import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import static org.apache.hadoop.yarn.service.timelineservice.ServiceTimelineMetricsConstants.DIAGNOSTICS_INFO;
+
+/**
+ * A single service that publishes all the Timeline Entities.
+ */
+public class ServiceTimelinePublisher extends CompositeService {
+
+  // Number of bytes of config which can be published in one shot to ATSv2.
+  public static final int ATS_CONFIG_PUBLISH_SIZE_BYTES = 10 * 1024;
+
+  private TimelineV2Client timelineClient;
+
+  private volatile boolean stopped = false;
+
+  private static final Logger log =
+      LoggerFactory.getLogger(ServiceTimelinePublisher.class);
+
+  @Override
+  protected void serviceInit(org.apache.hadoop.conf.Configuration configuration)
+      throws Exception {
+    addService(timelineClient);
+  }
+
+
+  @Override
+  protected void serviceStop() throws Exception {
+    stopped = true;
+    super.serviceStop();
+  }
+
+  public boolean isStopped() {
+    return stopped;
+  }
+
+  public ServiceTimelinePublisher(TimelineV2Client client) {
+    super(ServiceTimelinePublisher.class.getName());
+    timelineClient = client;
+  }
+
+  public void serviceAttemptRegistered(Service service,
+      org.apache.hadoop.conf.Configuration systemConf) {
+    long currentTimeMillis = service.getLaunchTime() == null
+        ? System.currentTimeMillis() : service.getLaunchTime().getTime();
+
+    TimelineEntity entity = createServiceAttemptEntity(service.getId());
+    entity.setCreatedTime(currentTimeMillis);
+
+    // create info keys
+    Map<String, Object> entityInfos = new HashMap<String, Object>();
+    entityInfos.put(ServiceTimelineMetricsConstants.NAME, service.getName());
+    entityInfos.put(ServiceTimelineMetricsConstants.STATE,
+        service.getState().toString());
+    entityInfos.put(ServiceTimelineMetricsConstants.LAUNCH_TIME,
+        currentTimeMillis);
+    entity.addInfo(ServiceTimelineMetricsConstants.QUICK_LINKS,
+        service.getQuicklinks());
+    entity.addInfo(entityInfos);
+
+    // add an event
+    TimelineEvent startEvent = new TimelineEvent();
+    startEvent.setId(ServiceTimelineEvent.SERVICE_ATTEMPT_REGISTERED.toString());
+    startEvent.setTimestamp(currentTimeMillis);
+    entity.addEvent(startEvent);
+
+    // publish before configurations published
+    putEntity(entity);
+
+    // publish system config - YarnConfiguration
+    populateTimelineEntity(systemConf.iterator(), service.getId(),
+        ServiceTimelineEntityType.SERVICE_ATTEMPT.toString());
+    // publish user conf
+    publishUserConf(service.getConfiguration(), service.getId(),
+        ServiceTimelineEntityType.SERVICE_ATTEMPT.toString());
+
+    // publish component as separate entity.
+    publishComponents(service.getComponents());
+  }
+
+  public void serviceAttemptUpdated(Service service) {
+    TimelineEntity entity = createServiceAttemptEntity(service.getId());
+    entity.addInfo(ServiceTimelineMetricsConstants.QUICK_LINKS,
+        service.getQuicklinks());
+    putEntity(entity);
+  }
+
+  public void serviceAttemptUnregistered(ServiceContext context,
+      String diagnostics) {
+    TimelineEntity entity = createServiceAttemptEntity(
+        context.attemptId.getApplicationId().toString());
+    Map<String, Object> entityInfos = new HashMap<String, Object>();
+    entityInfos.put(ServiceTimelineMetricsConstants.STATE,
+        FinalApplicationStatus.ENDED);
+    entityInfos.put(DIAGNOSTICS_INFO, diagnostics);
+    entity.addInfo(entityInfos);
+
+    // add an event
+    TimelineEvent finishEvent = new TimelineEvent();
+    finishEvent
+        .setId(ServiceTimelineEvent.SERVICE_ATTEMPT_UNREGISTERED.toString());
+    finishEvent.setTimestamp(System.currentTimeMillis());
+    entity.addEvent(finishEvent);
+
+    putEntity(entity);
+  }
+
+  public void componentInstanceStarted(Container container,
+      ComponentInstance instance) {
+
+    TimelineEntity entity = createComponentInstanceEntity(container.getId());
+    entity.setCreatedTime(container.getLaunchTime().getTime());
+
+    // create info keys
+    Map<String, Object> entityInfos = new HashMap<String, Object>();
+    entityInfos.put(ServiceTimelineMetricsConstants.BARE_HOST,
+        container.getBareHost());
+    entityInfos.put(ServiceTimelineMetricsConstants.STATE,
+        container.getState().toString());
+    entityInfos.put(ServiceTimelineMetricsConstants.LAUNCH_TIME,
+        container.getLaunchTime().getTime());
+    entityInfos.put(ServiceTimelineMetricsConstants.COMPONENT_NAME,
+        instance.getCompName());
+    entityInfos.put(ServiceTimelineMetricsConstants.COMPONENT_INSTANCE_NAME,
+        instance.getCompInstanceName());
+    entity.addInfo(entityInfos);
+
+    // add an event
+    TimelineEvent startEvent = new TimelineEvent();
+    startEvent
+        .setId(ServiceTimelineEvent.COMPONENT_INSTANCE_REGISTERED.toString());
+    startEvent.setTimestamp(container.getLaunchTime().getTime());
+    entity.addEvent(startEvent);
+
+    putEntity(entity);
+  }
+
+  public void componentInstanceFinished(ComponentInstance instance,
+      int exitCode, ContainerState state, String diagnostics) {
+    TimelineEntity entity = createComponentInstanceEntity(
+        instance.getContainer().getId().toString());
+
+    // create info keys
+    Map<String, Object> entityInfos = new HashMap<String, Object>();
+    entityInfos.put(ServiceTimelineMetricsConstants.EXIT_STATUS_CODE,
+        exitCode);
+    entityInfos.put(DIAGNOSTICS_INFO, diagnostics);
+    entityInfos.put(ServiceTimelineMetricsConstants.STATE, state);
+    entity.addInfo(entityInfos);
+
+    // add an event
+    TimelineEvent startEvent = new TimelineEvent();
+    startEvent
+        .setId(ServiceTimelineEvent.COMPONENT_INSTANCE_UNREGISTERED.toString());
+    startEvent.setTimestamp(System.currentTimeMillis());
+    entity.addEvent(startEvent);
+
+    putEntity(entity);
+  }
+
+  public void componentInstanceUpdated(Container container) {
+    TimelineEntity entity = createComponentInstanceEntity(container.getId());
+
+    // create info keys
+    Map<String, Object> entityInfos = new HashMap<String, Object>();
+    entityInfos.put(ServiceTimelineMetricsConstants.IP, container.getIp());
+    entityInfos.put(ServiceTimelineMetricsConstants.HOSTNAME,
+        container.getHostname());
+    entityInfos.put(ServiceTimelineMetricsConstants.STATE,
+        container.getState().toString());
+    entity.addInfo(entityInfos);
+
+    TimelineEvent updateEvent = new TimelineEvent();
+    updateEvent
+        .setId(ServiceTimelineEvent.COMPONENT_INSTANCE_UPDATED.toString());
+    updateEvent.setTimestamp(System.currentTimeMillis());
+    entity.addEvent(updateEvent);
+
+    putEntity(entity);
+  }
+
+  private void publishComponents(List<Component> components) {
+    long currentTimeMillis = System.currentTimeMillis();
+    for (Component component : components) {
+      TimelineEntity entity = createComponentEntity(component.getName());
+      entity.setCreatedTime(currentTimeMillis);
+
+      // create info keys
+      Map<String, Object> entityInfos = new HashMap<String, Object>();
+      entityInfos.put(ServiceTimelineMetricsConstants.ARTIFACT_ID,
+          component.getArtifact().getId());
+      entityInfos.put(ServiceTimelineMetricsConstants.ARTIFACT_TYPE,
+          component.getArtifact().getType().toString());
+      if (component.getResource().getProfile() != null) {
+        entityInfos.put(ServiceTimelineMetricsConstants.RESOURCE_PROFILE,
+            component.getResource().getProfile());
+      }
+      entityInfos.put(ServiceTimelineMetricsConstants.RESOURCE_CPU,
+          component.getResource().getCpus());
+      entityInfos.put(ServiceTimelineMetricsConstants.RESOURCE_MEMORY,
+          component.getResource().getMemory());
+
+      if (component.getLaunchCommand() != null) {
+        entityInfos.put(ServiceTimelineMetricsConstants.LAUNCH_COMMAND,
+            component.getLaunchCommand());
+      }
+      entityInfos.put(ServiceTimelineMetricsConstants.RUN_PRIVILEGED_CONTAINER,
+          component.getRunPrivilegedContainer().toString());
+      if (component.getPlacementPolicy() != null) {
+        entityInfos.put(ServiceTimelineMetricsConstants.PLACEMENT_POLICY,
+            component.getPlacementPolicy().getLabel());
+      }
+      entity.addInfo(entityInfos);
+
+      putEntity(entity);
+
+      // publish component specific configurations
+      publishUserConf(component.getConfiguration(), component.getName(),
+          ServiceTimelineEntityType.COMPONENT.toString());
+    }
+  }
+
+  private void publishUserConf(Configuration configuration,
+      String entityId, String entityType) {
+    populateTimelineEntity(configuration.getProperties().entrySet().iterator(),
+        entityId, entityType);
+
+    populateTimelineEntity(configuration.getEnv().entrySet().iterator(),
+        entityId, entityType);
+
+    for (ConfigFile configFile : configuration.getFiles()) {
+      populateTimelineEntity(configFile.getProps().entrySet().iterator(),
+          entityId, entityType);
+    }
+  }
+
+  private void populateTimelineEntity(Iterator<Entry<String, String>> iterator,
+      String entityId, String entityType) {
+    int configSize = 0;
+    TimelineEntity entity = createTimelineEntity(entityId, entityType);
+    while (iterator.hasNext()) {
+      Entry<String, String> entry = iterator.next();
+      int size = entry.getKey().length() + entry.getValue().length();
+      configSize += size;
+      // Configs are split into multiple entities if they exceed 100kb in size.
+      if (configSize > ATS_CONFIG_PUBLISH_SIZE_BYTES) {
+        if (entity.getConfigs().size() > 0) {
+          putEntity(entity);
+          entity = createTimelineEntity(entityId, entityType);
+        }
+        configSize = size;
+      }
+      entity.addConfig(entry.getKey(), entry.getValue());
+    }
+    if (configSize > 0) {
+      putEntity(entity);
+    }
+  }
+
+  /**
+   * Called from ServiceMetricsSink at regular interval of time.
+   * @param metrics of service or components
+   * @param entityId Id of entity
+   * @param entityType Type of entity
+   * @param timestamp
+   */
+  public void publishMetrics(Iterable<AbstractMetric> metrics, String entityId,
+      String entityType, long timestamp) {
+    TimelineEntity entity = createTimelineEntity(entityId, entityType);
+    Set<TimelineMetric> entityMetrics = new HashSet<TimelineMetric>();
+    for (AbstractMetric metric : metrics) {
+      TimelineMetric timelineMetric = new TimelineMetric();
+      timelineMetric.setId(metric.name());
+      timelineMetric.addValue(timestamp, metric.value());
+      entityMetrics.add(timelineMetric);
+    }
+    entity.setMetrics(entityMetrics);
+    putEntity(entity);
+  }
+
+  private TimelineEntity createServiceAttemptEntity(String serviceId) {
+    TimelineEntity entity = createTimelineEntity(serviceId,
+        ServiceTimelineEntityType.SERVICE_ATTEMPT.toString());
+    return entity;
+  }
+
+  private TimelineEntity createComponentInstanceEntity(String instanceId) {
+    TimelineEntity entity = createTimelineEntity(instanceId,
+        ServiceTimelineEntityType.COMPONENT_INSTANCE.toString());
+    return entity;
+  }
+
+  private TimelineEntity createComponentEntity(String componentId) {
+    TimelineEntity entity = createTimelineEntity(componentId,
+        ServiceTimelineEntityType.COMPONENT.toString());
+    return entity;
+  }
+
+  private TimelineEntity createTimelineEntity(String entityId,
+      String entityType) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setId(entityId);
+    entity.setType(entityType);
+    return entity;
+  }
+
+  private void putEntity(TimelineEntity entity) {
+    try {
+      if (log.isDebugEnabled()) {
+        log.debug("Publishing the entity " + entity + ", JSON-style content: "
+            + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+      }
+      if (timelineClient != null) {
+        timelineClient.putEntitiesAsync(entity);
+      } else {
+        log.error("Seems like client has been removed before the entity "
+            + "could be published for " + entity);
+      }
+    } catch (Exception e) {
+      log.error("Error when publishing entity " + entity, e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07469f9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/package-info.java
new file mode 100644
index 0000000..72f7842
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/timelineservice/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * ATS implementation
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.service.timelineservice;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07469f9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ApplicationReportSerDeser.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ApplicationReportSerDeser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ApplicationReportSerDeser.java
new file mode 100644
index 0000000..2607c08
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ApplicationReportSerDeser.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.utils;
+
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+
+import java.io.IOException;
+
+/**
+ * Persistence of {@link SerializedApplicationReport}
+ * 
+ */
+public class ApplicationReportSerDeser
+    extends JsonSerDeser<SerializedApplicationReport> {
+  public ApplicationReportSerDeser() {
+    super(SerializedApplicationReport.class);
+  }
+
+
+  private static final ApplicationReportSerDeser
+      staticinstance = new ApplicationReportSerDeser();
+
+  /**
+   * Convert an instance to a JSON string -sync access to a shared ser/deser
+   * object instance
+   * @param instance object to convert
+   * @return a JSON string description
+   * @throws JsonParseException parse problems
+   * @throws JsonMappingException O/J mapping problems
+   */
+  public static String toString(SerializedApplicationReport instance)
+      throws IOException, JsonGenerationException, JsonMappingException {
+    synchronized (staticinstance) {
+      return staticinstance.toJson(instance);
+    }
+  }
+ 
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07469f9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ClientRegistryBinder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ClientRegistryBinder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ClientRegistryBinder.java
new file mode 100644
index 0000000..86896b2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ClientRegistryBinder.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.utils;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
+import org.apache.hadoop.registry.client.impl.zk.RegistryInternalConstants;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.hadoop.registry.client.binding.RegistryPathUtils.encodeForRegistry;
+import static org.apache.hadoop.registry.client.binding.RegistryUtils.convertUsername;
+import static org.apache.hadoop.registry.client.binding.RegistryUtils.getCurrentUsernameUnencoded;
+import static org.apache.hadoop.registry.client.binding.RegistryUtils.servicePath;
+
+/**
+ * Generic code to get the URLs for clients via the registry
+ */
+public class ClientRegistryBinder {
+  private static final Logger log =
+      LoggerFactory.getLogger(ClientRegistryBinder.class);
+
+  private final RegistryOperations operations;
+
+  public ClientRegistryBinder(RegistryOperations operations) {
+    this.operations = operations;
+  }
+
+  /**
+   * Buld the user path -switches to the system path if the user is "".
+   * It also cross-converts the username to ascii via punycode
+   * @param username username or ""
+   * @return the path to the user
+   */
+  public static String homePathForUser(String username) {
+    Preconditions.checkArgument(username != null, "null user");
+
+    // catch recursion
+    if (username.startsWith(RegistryConstants.PATH_USERS)) {
+      return username;
+    }
+
+    if (username.isEmpty()) {
+      return RegistryConstants.PATH_SYSTEM_SERVICES;
+    }
+
+    // convert username to registry name
+    String convertedName = convertUsername(username);
+
+    return RegistryPathUtils.join(RegistryConstants.PATH_USERS,
+        encodeForRegistry(convertedName));
+  }
+
+  /**
+   * Get the current username, before any encoding has been applied.
+   * @return the current user from the kerberos identity, falling back
+   * to the user and/or env variables.
+   */
+  public static String currentUsernameUnencoded() {
+    String env_hadoop_username = System.getenv(
+        RegistryInternalConstants.HADOOP_USER_NAME);
+    return getCurrentUsernameUnencoded(env_hadoop_username);
+  }
+
+  /**
+   * Qualify a user.
+   * <ol>
+   *   <li> <code>"~"</code> maps to user home path home</li>
+   *   <li> <code>"~user"</code> maps to <code>/users/$user</code></li>
+   *   <li> <code>"/"</code> maps to <code>/services/</code></li>
+   * </ol>
+   * @param user the username
+   * @return the base path
+   */
+  public static String qualifyUser(String user) {
+    // qualify the user
+    String t = user.trim();
+    if (t.startsWith("/")) {
+      // already resolved
+      return t;
+    } else if (t.equals("~")) {
+      // self
+      return currentUsernameUnencoded();
+    } else if (t.startsWith("~")) {
+      // another user
+      // convert username to registry name
+      String convertedName = convertUsername(t.substring(1));
+
+      return RegistryPathUtils.join(RegistryConstants.PATH_USERS,
+          encodeForRegistry(convertedName));
+    } else {
+      return "/" + t;
+    }
+  }
+
+  /**
+   * Look up an external REST API
+   * @param user user which will be qualified as per {@link #qualifyUser(String)}
+   * @param serviceClass service class
+   * @param instance instance name
+   * @param api API
+   * @return the API, or an exception is raised.
+   * @throws IOException
+   */
+  public String lookupExternalRestAPI(String user,
+      String serviceClass,
+      String instance,
+      String api)
+      throws IOException {
+    String qualified = qualifyUser(user);
+    String path = servicePath(qualified, serviceClass, instance);
+    String restAPI = resolveExternalRestAPI(api, path);
+    if (restAPI == null) {
+      throw new PathNotFoundException(path + " API " + api);
+    }
+    return restAPI;
+  }
+
+  /**
+   * Resolve a service record then return an external REST API exported it.
+   *
+   * @param api API to resolve
+   * @param path path of the service record
+   * @return null if the record exists but the API is absent or it has no
+   * REST endpoints.
+   * @throws IOException resolution problems, as covered in
+   * {@link RegistryOperations#resolve(String)}
+   */
+  protected String resolveExternalRestAPI(String api, String path) throws
+      IOException {
+    ServiceRecord record = operations.resolve(path);
+    return lookupRestAPI(record, api, true);
+  }
+
+  /**
+   * Look up an external REST API endpoint
+   * @param record service record
+   * @param api URI of api
+   * @param external flag to indicate this is an external record
+   * @return the first endpoint of the implementation, or null if there
+   * is no entry for the API, implementation or it's the wrong type.
+   */
+  public static String lookupRestAPI(ServiceRecord record,
+      String api, boolean external) throws InvalidRecordException {
+    try {
+      String url = null;
+      Endpoint endpoint = getEndpoint(record, api, external);
+      List<String> addresses =
+          RegistryTypeUtils.retrieveAddressesUriType(endpoint);
+      if (addresses != null && !addresses.isEmpty()) {
+        url = addresses.get(0);
+      }
+      return url;
+    } catch (InvalidRecordException e) {
+      log.debug("looking for API {}", api, e);
+      return null;
+    }
+  }
+
+  /**
+   * Get an endpont by API
+   * @param record service record
+   * @param api API
+   * @param external flag to indicate this is an external record
+   * @return the endpoint or null
+   */
+  public static Endpoint getEndpoint(ServiceRecord record,
+      String api,
+      boolean external) {
+    return external ? record.getExternalEndpoint(api)
+                    : record.getInternalEndpoint(api);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07469f9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/Comparators.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/Comparators.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/Comparators.java
new file mode 100644
index 0000000..9f0e5d4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/Comparators.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.utils;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+/**
+ * Some general comparators
+ */
+public class Comparators {
+
+  public static class LongComparator implements Comparator<Long>, Serializable {
+    @Override
+    public int compare(Long o1, Long o2) {
+      return o1.compareTo(o2);
+    }
+  }
+
+  public static class InvertedLongComparator
+      implements Comparator<Long>, Serializable {
+    @Override
+    public int compare(Long o1, Long o2) {
+      return o2.compareTo(o1);
+    }
+  }
+
+  /**
+   * Little template class to reverse any comparitor
+   * @param <CompareType> the type that is being compared
+   */
+  public static class ComparatorReverser<CompareType> implements Comparator<CompareType>,
+      Serializable {
+
+    final Comparator<CompareType> instance;
+
+    public ComparatorReverser(Comparator<CompareType> instance) {
+      this.instance = instance;
+    }
+
+    @Override
+    public int compare(CompareType first, CompareType second) {
+      return instance.compare(second, first);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07469f9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigHelper.java
new file mode 100644
index 0000000..fe8cce8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigHelper.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.utils;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.service.exceptions.BadConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.net.URL;
+import java.util.Map;
+
+/**
+ * Methods to aid in config, both in the Configuration class and
+ * with other parts of setting up Slider-initated processes.
+ *
+ * Some of the methods take an argument of a map iterable for their sources; this allows
+ * the same method
+ */
+public class ConfigHelper {
+  private static final Logger log = LoggerFactory.getLogger(ConfigHelper.class);
+
+  /**
+   * Set an entire map full of values
+   *
+   * @param config config to patch
+   * @param map map of data
+   * @param origin origin data
+   */
+  public static void addConfigMap(Configuration config,
+                                  Map<String, String> map,
+                                  String origin) throws BadConfigException {
+    addConfigMap(config, map.entrySet(), origin);
+  }
+
+  /**
+   * Set an entire map full of values
+   *
+   * @param config config to patch
+   * @param map map of data
+   * @param origin origin data
+   */
+  public static void addConfigMap(Configuration config,
+                                  Iterable<Map.Entry<String, String>> map,
+                                  String origin) throws BadConfigException {
+    for (Map.Entry<String, String> mapEntry : map) {
+      String key = mapEntry.getKey();
+      String value = mapEntry.getValue();
+      if (value == null) {
+        throw new BadConfigException("Null value for property " + key);
+      }
+      config.set(key, value, origin);
+    }
+  }
+
+  /**
+   * Convert to an XML string
+   * @param conf configuration
+   * @return conf
+   * @throws IOException
+   */
+  public static String toXml(Configuration conf) throws IOException {
+    StringWriter writer = new StringWriter();
+    conf.writeXml(writer);
+    return writer.toString();
+  }
+
+
+  /**
+   * Register a resource as a default resource.
+   * Do not attempt to use this unless you understand that the
+   * order in which default resources are loaded affects the outcome,
+   * and that subclasses of Configuration often register new default
+   * resources
+   * @param resource the resource name
+   * @return the URL or null
+   */
+  public static URL registerDefaultResource(String resource) {
+    URL resURL = getResourceUrl(resource);
+    if (resURL != null) {
+      Configuration.addDefaultResource(resource);
+    }
+    return resURL;
+  }
+
+  /**
+   * Load a configuration from a resource on this classpath.
+   * If the resource is not found, an empty configuration is returned
+   * @param resource the resource name
+   * @return the loaded configuration.
+   */
+  public static Configuration loadFromResource(String resource) {
+    Configuration conf = new Configuration(false);
+    URL resURL = getResourceUrl(resource);
+    if (resURL != null) {
+      log.debug("loaded resources from {}", resURL);
+      conf.addResource(resource);
+    } else{
+      log.debug("failed to find {} on the classpath", resource);
+    }
+    return conf;
+
+  }
+
+  /**
+   * Get the URL to a resource, null if not on the CP
+   * @param resource resource to look for
+   * @return the URL or null
+   */
+  public static URL getResourceUrl(String resource) {
+    return ConfigHelper.class.getClassLoader()
+                                  .getResource(resource);
+  }
+
+  /**
+   * This goes through the keyset of one configuration and retrieves each value
+   * from a value source -a different or the same configuration. This triggers
+   * the property resolution process of the value, resolving any variables against
+   * in-config or inherited configurations
+   * @param keysource source of keys
+   * @param valuesource the source of values
+   * @return a new configuration where <code>foreach key in keysource, get(key)==valuesource.get(key)</code>
+   */
+  public static Configuration resolveConfiguration(
+      Iterable<Map.Entry<String, String>> keysource,
+      Configuration valuesource) {
+    Configuration result = new Configuration(false);
+    for (Map.Entry<String, String> entry : keysource) {
+      String key = entry.getKey();
+      String value = valuesource.get(key);
+      Preconditions.checkState(value != null,
+          "no reference for \"%s\" in values", key);
+      result.set(key, value);
+    }
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07469f9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigUtils.java
new file mode 100644
index 0000000..a969be9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ConfigUtils.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.service.utils;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.service.api.records.ConfigFormat;
+import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class ConfigUtils {
+  public static final String TEMPLATE_FILE = "template.file";
+
+  public static String replaceProps(Map<String, String> config, String content) {
+    Map<String, String> tokens = new HashMap<>();
+    for (Entry<String, String> entry : config.entrySet()) {
+      tokens.put("${" + entry.getKey() + "}", entry.getValue());
+      tokens.put("{{" + entry.getKey() + "}}", entry.getValue());
+    }
+    String value = content;
+    for (Map.Entry<String,String> token : tokens.entrySet()) {
+      value = value.replaceAll(Pattern.quote(token.getKey()),
+          Matcher.quoteReplacement(token.getValue()));
+    }
+    return value;
+  }
+
+  public static Map<String, String> replacePropsInConfig(
+      Map<String, String> config, Map<String, String> env) {
+    Map<String, String> tokens = new HashMap<>();
+    for (Entry<String, String> entry : env.entrySet()) {
+      tokens.put("${" + entry.getKey() + "}", entry.getValue());
+    }
+    Map<String, String> newConfig = new HashMap<>();
+    for (Entry<String, String> entry : config.entrySet()) {
+      String value = entry.getValue();
+      for (Map.Entry<String,String> token : tokens.entrySet()) {
+        value = value.replaceAll(Pattern.quote(token.getKey()),
+            Matcher.quoteReplacement(token.getValue()));
+      }
+      newConfig.put(entry.getKey(), entry.getValue());
+    }
+    return newConfig;
+  }
+
+  public static void prepConfigForTemplateOutputter(ConfigFormat configFormat,
+      Map<String, String> config, SliderFileSystem fileSystem,
+      String clusterName, String fileName) throws IOException {
+    if (!configFormat.equals(ConfigFormat.TEMPLATE)) {
+      return;
+    }
+    Path templateFile = null;
+    if (config.containsKey(TEMPLATE_FILE)) {
+      templateFile = fileSystem.buildResourcePath(config.get(TEMPLATE_FILE));
+      if (!fileSystem.isFile(templateFile)) {
+        templateFile = fileSystem.buildResourcePath(clusterName,
+            config.get(TEMPLATE_FILE));
+      }
+      if (!fileSystem.isFile(templateFile)) {
+        throw new IOException("config specified template file " + config
+            .get(TEMPLATE_FILE) + " but " + templateFile + " doesn't exist");
+      }
+    }
+    if (templateFile == null && fileName != null) {
+      templateFile = fileSystem.buildResourcePath(fileName);
+      if (!fileSystem.isFile(templateFile)) {
+        templateFile = fileSystem.buildResourcePath(clusterName,
+            fileName);
+      }
+    }
+    if (fileSystem.isFile(templateFile)) {
+      config.put("content", fileSystem.cat(templateFile));
+    } else {
+      config.put("content", "");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07469f9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java
new file mode 100644
index 0000000..281e1dfe
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.utils;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.service.conf.SliderExitCodes;
+import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
+import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
+import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException;
+import org.apache.hadoop.yarn.service.exceptions.ErrorStrings;
+import org.apache.hadoop.yarn.service.exceptions.SliderException;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CoreFileSystem {
+  private static final Logger
+    log = LoggerFactory.getLogger(CoreFileSystem.class);
+
+  private static final String UTF_8 = "UTF-8";
+
+  protected final FileSystem fileSystem;
+  protected final Configuration configuration;
+
+  public CoreFileSystem(FileSystem fileSystem, Configuration configuration) {
+    Preconditions.checkNotNull(fileSystem,
+                               "Cannot create a CoreFileSystem with a null FileSystem");
+    Preconditions.checkNotNull(configuration,
+                               "Cannot create a CoreFileSystem with a null Configuration");
+    this.fileSystem = fileSystem;
+    this.configuration = configuration;
+  }
+
+  public CoreFileSystem(Configuration configuration) throws IOException {
+    Preconditions.checkNotNull(configuration,
+                               "Cannot create a CoreFileSystem with a null Configuration");
+    this.fileSystem = FileSystem.get(configuration);
+    this.configuration = configuration;
+  }
+  
+  /**
+   * Get the temp path for this cluster
+   * @param clustername name of the cluster
+   * @return path for temp files (is not purged)
+   */
+  public Path getTempPathForCluster(String clustername) {
+    Path clusterDir = buildClusterDirPath(clustername);
+    return new Path(clusterDir, YarnServiceConstants.TMP_DIR_PREFIX);
+  }
+
+  /**
+   * Returns the underlying FileSystem for this object.
+   *
+   * @return filesystem
+   */
+  public FileSystem getFileSystem() {
+    return fileSystem;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb =
+      new StringBuilder("CoreFileSystem{");
+    sb.append("fileSystem=").append(fileSystem.getUri());
+    sb.append('}');
+    return sb.toString();
+  }
+
+  /**
+   * Build up the path string for a cluster instance -no attempt to
+   * create the directory is made
+   *
+   * @param clustername name of the cluster
+   * @return the path for persistent data
+   */
+  public Path buildClusterDirPath(String clustername) {
+    Preconditions.checkNotNull(clustername);
+    Path path = getBaseApplicationPath();
+    return new Path(path, YarnServiceConstants.SERVICES_DIRECTORY + "/" + clustername);
+  }
+
+
+  /**
+   * Build up the path string for keytab install location -no attempt to
+   * create the directory is made
+   *
+   * @return the path for keytab
+   */
+  public Path buildKeytabInstallationDirPath(String keytabFolder) {
+    Preconditions.checkNotNull(keytabFolder);
+    Path path = getBaseApplicationPath();
+    return new Path(path, YarnServiceConstants.KEYTAB_DIR + "/" + keytabFolder);
+  }
+
+  /**
+   * Build up the path string for keytab install location -no attempt to
+   * create the directory is made
+   *
+   * @return the path for keytab installation location
+   */
+  public Path buildKeytabPath(String keytabDir, String keytabName, String clusterName) {
+    Path homePath = getHomeDirectory();
+    Path baseKeytabDir;
+    if (keytabDir != null) {
+      baseKeytabDir = new Path(homePath, keytabDir);
+    } else {
+      baseKeytabDir = new Path(buildClusterDirPath(clusterName),
+                               YarnServiceConstants.KEYTAB_DIR);
+    }
+    return keytabName == null ? baseKeytabDir :
+        new Path(baseKeytabDir, keytabName);
+  }
+
+  /**
+   * Build up the path string for resource install location -no attempt to
+   * create the directory is made
+   *
+   * @return the path for resource
+   */
+  public Path buildResourcePath(String resourceFolder) {
+    Preconditions.checkNotNull(resourceFolder);
+    Path path = getBaseApplicationPath();
+    return new Path(path, YarnServiceConstants.RESOURCE_DIR + "/" + resourceFolder);
+  }
+
+  /**
+   * Build up the path string for resource install location -no attempt to
+   * create the directory is made
+   *
+   * @return the path for resource
+   */
+  public Path buildResourcePath(String dirName, String fileName) {
+    Preconditions.checkNotNull(dirName);
+    Preconditions.checkNotNull(fileName);
+    Path path = getBaseApplicationPath();
+    return new Path(path, YarnServiceConstants.RESOURCE_DIR + "/" + dirName + "/" + fileName);
+  }
+
+  /**
+   * Create a directory with the given permissions.
+   *
+   * @param dir          directory
+   * @param clusterPerms cluster permissions
+   * @throws IOException  IO problem
+   * @throws BadClusterStateException any cluster state problem
+   */
+  @SuppressWarnings("deprecation")
+  public void createWithPermissions(Path dir, FsPermission clusterPerms) throws
+          IOException,
+          BadClusterStateException {
+    if (fileSystem.isFile(dir)) {
+      // HADOOP-9361 shows some filesystems don't correctly fail here
+      throw new BadClusterStateException(
+              "Cannot create a directory over a file %s", dir);
+    }
+    log.debug("mkdir {} with perms {}", dir, clusterPerms);
+    //no mask whatoever
+    fileSystem.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000");
+    fileSystem.mkdirs(dir, clusterPerms);
+    //and force set it anyway just to make sure
+    fileSystem.setPermission(dir, clusterPerms);
+  }
+
+  /**
+   * Verify that the cluster directory is not present
+   *
+   * @param clustername      name of the cluster
+   * @param clusterDirectory actual directory to look for
+   * @throws IOException trouble with FS
+   * @throws SliderException If the directory exists
+   */
+  public void verifyClusterDirectoryNonexistent(String clustername,
+                                                Path clusterDirectory)
+      throws IOException, SliderException {
+    if (fileSystem.exists(clusterDirectory)) {
+      throw new SliderException(SliderExitCodes.EXIT_INSTANCE_EXISTS,
+              ErrorStrings.PRINTF_E_INSTANCE_ALREADY_EXISTS, clustername,
+              clusterDirectory);
+    }
+  }
+  /**
+   * Verify that the given directory is not present
+   *
+   * @param clusterDirectory actual directory to look for
+   * @throws IOException    trouble with FS
+   * @throws SliderException If the directory exists
+   */
+  public void verifyDirectoryNonexistent(Path clusterDirectory) throws
+          IOException,
+      SliderException {
+    if (fileSystem.exists(clusterDirectory)) {
+      
+      log.error("Dir {} exists: {}",
+                clusterDirectory,
+                listFSDir(clusterDirectory));
+      throw new SliderException(SliderExitCodes.EXIT_INSTANCE_EXISTS,
+              ErrorStrings.PRINTF_E_INSTANCE_DIR_ALREADY_EXISTS,
+              clusterDirectory);
+    }
+  }
+
+  /**
+   * Verify that a user has write access to a directory.
+   * It does this by creating then deleting a temp file
+   *
+   * @param dirPath actual directory to look for
+   * @throws FileNotFoundException file not found
+   * @throws IOException  trouble with FS
+   * @throws BadClusterStateException if the directory is not writeable
+   */
+  public void verifyDirectoryWriteAccess(Path dirPath) throws IOException,
+      SliderException {
+    verifyPathExists(dirPath);
+    Path tempFile = new Path(dirPath, "tmp-file-for-checks");
+    try {
+      FSDataOutputStream out ;
+      out = fileSystem.create(tempFile, true);
+      IOUtils.closeStream(out);
+      fileSystem.delete(tempFile, false);
+    } catch (IOException e) {
+      log.warn("Failed to create file {}: {}", tempFile, e);
+      throw new BadClusterStateException(e,
+              "Unable to write to directory %s : %s", dirPath, e.toString());
+    }
+  }
+
+  /**
+   * Verify that a path exists
+   * @param path path to check
+   * @throws FileNotFoundException file not found
+   * @throws IOException  trouble with FS
+   */
+  public void verifyPathExists(Path path) throws IOException {
+    if (!fileSystem.exists(path)) {
+      throw new FileNotFoundException(path.toString());
+    }
+  }
+
+  /**
+   * Verify that a path exists
+   * @param path path to check
+   * @throws FileNotFoundException file not found or is not a file
+   * @throws IOException  trouble with FS
+   */
+  public void verifyFileExists(Path path) throws IOException {
+    FileStatus status = fileSystem.getFileStatus(path);
+
+    if (!status.isFile()) {
+      throw new FileNotFoundException("Not a file: " + path.toString());
+    }
+  }
+
+  /**
+   * Given a path, check if it exists and is a file
+   * 
+   * @param path
+   *          absolute path to the file to check
+   * @return true if and only if path exists and is a file, false for all other
+   *          reasons including if file check throws IOException
+   */
+  public boolean isFile(Path path) {
+    boolean isFile = false;
+    try {
+      FileStatus status = fileSystem.getFileStatus(path);
+      if (status.isFile()) {
+        isFile = true;
+      }
+    } catch (IOException e) {
+      // ignore, isFile is already set to false
+    }
+    return isFile;
+  }
+
+  /**
+   * Get the base path
+   *
+   * @return the base path optionally configured by 
+   * {@link YarnServiceConf#YARN_SERVICE_BASE_PATH}
+   */
+  public Path getBaseApplicationPath() {
+    String configuredBasePath = configuration
+        .get(YarnServiceConf.YARN_SERVICE_BASE_PATH,
+            getHomeDirectory() + "/" + YarnServiceConstants.SERVICE_BASE_DIRECTORY);
+    return new Path(configuredBasePath);
+  }
+
+  /**
+   * Get slider dependency parent dir in HDFS
+   * 
+   * @return the parent dir path of slider.tar.gz in HDFS
+   */
+  public Path getDependencyPath() {
+    String parentDir = YarnServiceConstants.DEPENDENCY_DIR;
+    return new Path(String.format(parentDir, VersionInfo.getVersion()));
+  }
+
+  /**
+   * Get slider.tar.gz absolute filepath in HDFS
+   * 
+   * @return the absolute path to slider.tar.gz in HDFS
+   */
+  public Path getDependencyTarGzip() {
+    Path dependencyLibAmPath = getDependencyPath();
+    Path dependencyLibTarGzip = new Path(
+        dependencyLibAmPath.toUri().toString(),
+        YarnServiceConstants.DEPENDENCY_TAR_GZ_FILE_NAME
+            + YarnServiceConstants.DEPENDENCY_TAR_GZ_FILE_EXT);
+    return dependencyLibTarGzip;
+  }
+
+  public Path getHomeDirectory() {
+    return fileSystem.getHomeDirectory();
+  }
+
+  /**
+   * Create an AM resource from the
+   *
+   * @param destPath     dest path in filesystem
+   * @param resourceType resource type
+   * @return the local resource for AM
+   */
+  public LocalResource createAmResource(Path destPath, LocalResourceType resourceType) throws IOException {
+    FileStatus destStatus = fileSystem.getFileStatus(destPath);
+    LocalResource amResource = Records.newRecord(LocalResource.class);
+    amResource.setType(resourceType);
+    // Set visibility of the resource
+    // Setting to most private option
+    amResource.setVisibility(LocalResourceVisibility.APPLICATION);
+    // Set the resource to be copied over
+    amResource.setResource(
+        URL.fromPath(fileSystem.resolvePath(destStatus.getPath())));
+    // Set timestamp and length of file so that the framework
+    // can do basic sanity checks for the local resource
+    // after it has been copied over to ensure it is the same
+    // resource the client intended to use with the service
+    amResource.setTimestamp(destStatus.getModificationTime());
+    amResource.setSize(destStatus.getLen());
+    return amResource;
+  }
+
+  /**
+   * Register all files under a fs path as a directory to push out
+   *
+   * @param srcDir          src dir
+   * @param destRelativeDir dest dir (no trailing /)
+   * @return the map of entries
+   */
+  public Map<String, LocalResource> submitDirectory(Path srcDir, String destRelativeDir) throws IOException {
+    //now register each of the files in the directory to be
+    //copied to the destination
+    FileStatus[] fileset = fileSystem.listStatus(srcDir);
+    Map<String, LocalResource> localResources =
+            new HashMap<String, LocalResource>(fileset.length);
+    for (FileStatus entry : fileset) {
+
+      LocalResource resource = createAmResource(entry.getPath(),
+              LocalResourceType.FILE);
+      String relativePath = destRelativeDir + "/" + entry.getPath().getName();
+      localResources.put(relativePath, resource);
+    }
+    return localResources;
+  }
+
+  /**
+   * Submit a JAR containing a specific class, returning
+   * the resource to be mapped in
+   *
+   * @param clazz   class to look for
+   * @param subdir  subdirectory (expected to end in a "/")
+   * @param jarName <i>At the destination</i>
+   * @return the local resource ref
+   * @throws IOException trouble copying to HDFS
+   */
+  public LocalResource submitJarWithClass(Class clazz, Path tempPath, String subdir, String jarName)
+          throws IOException, SliderException {
+    File localFile = SliderUtils.findContainingJarOrFail(clazz);
+    return submitFile(localFile, tempPath, subdir, jarName);
+  }
+
+  /**
+   * Submit a local file to the filesystem references by the instance's cluster
+   * filesystem
+   *
+   * @param localFile    filename
+   * @param subdir       subdirectory (expected to end in a "/")
+   * @param destFileName destination filename
+   * @return the local resource ref
+   * @throws IOException trouble copying to HDFS
+   */
+  public LocalResource submitFile(File localFile, Path tempPath, String subdir, String destFileName)
+      throws IOException {
+    Path src = new Path(localFile.toString());
+    Path subdirPath = new Path(tempPath, subdir);
+    fileSystem.mkdirs(subdirPath);
+    Path destPath = new Path(subdirPath, destFileName);
+    log.debug("Copying {} (size={} bytes) to {}", localFile, localFile.length(), destPath);
+
+    fileSystem.copyFromLocalFile(false, true, src, destPath);
+
+    // Set the type of resource - file or archive
+    // archives are untarred at destination
+    // we don't need the jar file to be untarred for now
+    return createAmResource(destPath, LocalResourceType.FILE);
+  }
+
+  /**
+   * Submit the AM tar.gz resource referenced by the instance's cluster
+   * filesystem. Also, update the providerResources object with the new
+   * resource.
+   * 
+   * @param providerResources
+   *          the provider resource map to be updated
+   * @throws IOException
+   *           trouble copying to HDFS
+   */
+  public void submitTarGzipAndUpdate(
+      Map<String, LocalResource> providerResources) throws IOException,
+      BadClusterStateException {
+    Path dependencyLibTarGzip = getDependencyTarGzip();
+    LocalResource lc = createAmResource(dependencyLibTarGzip,
+        LocalResourceType.ARCHIVE);
+    providerResources.put(YarnServiceConstants.DEPENDENCY_LOCALIZED_DIR_LINK, lc);
+  }
+
+  public void copyLocalFileToHdfs(File localPath,
+      Path destPath, FsPermission fp)
+      throws IOException {
+    if (localPath == null || destPath == null) {
+      throw new IOException("Either localPath or destPath is null");
+    }
+    fileSystem.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,
+        "000");
+    fileSystem.mkdirs(destPath.getParent(), fp);
+    log.info("Copying file {} to {}", localPath.toURI(),
+        fileSystem.getScheme() + ":/" + destPath.toUri());
+    
+    fileSystem.copyFromLocalFile(false, true, new Path(localPath.getPath()),
+        destPath);
+    // set file permissions of the destPath
+    fileSystem.setPermission(destPath, fp);
+  }
+
+  public void copyHdfsFileToLocal(Path hdfsPath, File destFile)
+      throws IOException {
+    if (hdfsPath == null || destFile == null) {
+      throw new IOException("Either hdfsPath or destPath is null");
+    }
+    log.info("Copying file {} to {}", hdfsPath.toUri(), destFile.toURI());
+
+    Path destPath = new Path(destFile.getPath());
+    fileSystem.copyToLocalFile(hdfsPath, destPath);
+  }
+
+  /**
+   * list entries in a filesystem directory
+   *
+   * @param path directory
+   * @return a listing, one to a line
+   * @throws IOException
+   */
+  public String listFSDir(Path path) throws IOException {
+    FileStatus[] stats = fileSystem.listStatus(path);
+    StringBuilder builder = new StringBuilder();
+    for (FileStatus stat : stats) {
+      builder.append(stat.getPath().toString())
+              .append("\t")
+              .append(stat.getLen())
+              .append("\n");
+    }
+    return builder.toString();
+  }
+
+  public String cat(Path path) throws IOException {
+    FileStatus status = fileSystem.getFileStatus(path);
+    byte[] b = new byte[(int) status.getLen()];
+    FSDataInputStream in = null;
+    try {
+      in = fileSystem.open(path);
+      int count = in.read(b);
+      return new String(b, 0, count, UTF_8);
+    } finally {
+      IOUtils.closeStream(in);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07469f9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/Duration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/Duration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/Duration.java
new file mode 100644
index 0000000..6fadfd3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/Duration.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.utils;
+
+import java.io.Closeable;
+
+/**
+ * A duration in milliseconds. This class can be used
+ * to count time, and to be polled to see if a time limit has
+ * passed.
+ */
+public class Duration implements Closeable {
+  public long start, finish;
+  public final long limit;
+
+  /**
+   * Create a duration instance with a limit of 0
+   */
+  public Duration() {
+    this(0);
+  }
+
+  /**
+   * Create a duration with a limit specified in millis
+   * @param limit duration in milliseconds
+   */
+  public Duration(long limit) {
+    this.limit = limit;
+  }
+
+  /**
+   * Start
+   * @return self
+   */
+  public Duration start() {
+    start = now();
+    return this;
+  }
+
+  /**
+   * The close operation relays to {@link #finish()}.
+   * Implementing it allows Duration instances to be automatically
+   * finish()'d in Java7 try blocks for when used in measuring durations.
+   */
+  @Override
+  public final void close() {
+    finish();
+  }
+
+  public void finish() {
+    finish = now();
+  }
+
+  protected long now() {
+    return System.nanoTime()/1000000;
+  }
+
+  public long getInterval() {
+    return finish - start;
+  }
+
+  /**
+   * return true if the limit has been exceeded
+   * @return true if a limit was set and the current time
+   * exceeds it.
+   */
+  public boolean getLimitExceeded() {
+    return limit >= 0 && ((now() - start) > limit);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("Duration");
+     if (finish >= start) {
+       builder.append(" finished at ").append(getInterval()).append(" millis;");
+     } else {
+       if (start > 0) {
+         builder.append(" started but not yet finished;");
+       } else {
+         builder.append(" unstarted;");
+       }
+     }
+    if (limit > 0) {
+      builder.append(" limit: ").append(limit).append(" millis");
+      if (getLimitExceeded()) {
+        builder.append(" -  exceeded");
+      }
+    }
+    return  builder.toString();
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message