hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjs...@apache.org
Subject [27/27] hadoop git commit: YARN-3264. Created backing storage write interface and a POC only FS based storage implementation. Contributed by Vrushali C.
Date Thu, 05 Mar 2015 23:04:34 GMT
YARN-3264. Created backing storage write interface and a POC only FS based storage implementation.
Contributed by Vrushali C.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/821b68d0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/821b68d0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/821b68d0

Branch: refs/heads/YARN-2928
Commit: 821b68d05d246fd57d7b7286eb2ccc075ed1eae8
Parents: 848acd5
Author: Zhijie Shen <zjshen@apache.org>
Authored: Thu Mar 5 15:03:30 2015 -0800
Committer: Zhijie Shen <zjshen@apache.org>
Committed: Thu Mar 5 15:03:30 2015 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../timelineservice/TimelineWriteResponse.java  | 170 +++++++++++++++++++
 .../hadoop/yarn/conf/YarnConfiguration.java     |   2 +
 .../distributedshell/TestDistributedShell.java  |  56 +++++-
 .../aggregator/TimelineAggregator.java          |  43 +++--
 .../storage/FileSystemTimelineWriterImpl.java   | 144 ++++++++++++++++
 .../storage/TimelineAggregationTrack.java       |  28 +++
 .../timelineservice/storage/TimelineWriter.java |  66 +++++++
 .../aggregator/TestTimelineAggregator.java      |  23 ---
 .../TestFileSystemTimelineWriterImpl.java       |  79 +++++++++
 10 files changed, 574 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b68d0/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index f5a60f7..df1061d 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -26,6 +26,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3210. Refactored timeline aggregator according to new code
     organization proposed in YARN-3166. (Li Lu via zjshen)
 
+    YARN-3264. Created backing storage write interface and a POC only FS based
+    storage implementation. (Vrushali C via zjshen)
+
   IMPROVEMENTS
 
   OPTIMIZATIONS

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b68d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java
new file mode 100644
index 0000000..82ecdbd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A class that holds a list of put errors. This is the response returned when a
+ * list of {@link TimelineEntity} objects is added to the timeline. If there are errors
+ * in storing individual entity objects, they will be indicated in the list of
+ * errors.
+ */
+@XmlRootElement(name = "response")
+@XmlAccessorType(XmlAccessType.NONE)
+@Public
+@Unstable
+public class TimelineWriteResponse {
+
+  private List<TimelineWriteError> errors = new ArrayList<TimelineWriteError>();
+
+  public TimelineWriteResponse() {
+
+  }
+
+  /**
+   * Get a list of {@link TimelineWriteError} instances
+   * 
+   * @return a list of {@link TimelineWriteError} instances
+   */
+  @XmlElement(name = "errors")
+  public List<TimelineWriteError> getErrors() {
+    return errors;
+  }
+
+  /**
+   * Add a single {@link TimelineWriteError} instance into the existing list
+   * 
+   * @param error
+   *          a single {@link TimelineWriteError} instance
+   */
+  public void addError(TimelineWriteError error) {
+    errors.add(error);
+  }
+
+  /**
+   * Add a list of {@link TimelineWriteError} instances into the existing list
+   * 
+   * @param errors
+   *          a list of {@link TimelineWriteError} instances
+   */
+  public void addErrors(List<TimelineWriteError> errors) {
+    this.errors.addAll(errors);
+  }
+
+  /**
+   * Set the list to the given list of {@link TimelineWriteError} instances
+   * 
+   * @param errors
+   *          a list of {@link TimelineWriteError} instances
+   */
+  public void setErrors(List<TimelineWriteError> errors) {
+    this.errors.clear();
+    this.errors.addAll(errors);
+  }
+
+  /**
+   * A class that holds the error code for one entity.
+   */
+  @XmlRootElement(name = "error")
+  @XmlAccessorType(XmlAccessType.NONE)
+  @Public
+  @Unstable
+  public static class TimelineWriteError {
+
+    /**
+     * Error code returned if an IOException is encountered when storing an
+     * entity.
+     */
+    public static final int IO_EXCEPTION = 1;
+
+    private String entityId;
+    private String entityType;
+    private int errorCode;
+
+    /**
+     * Get the entity Id
+     * 
+     * @return the entity Id
+     */
+    @XmlElement(name = "entity")
+    public String getEntityId() {
+      return entityId;
+    }
+
+    /**
+     * Set the entity Id
+     * 
+     * @param entityId
+     *          the entity Id
+     */
+    public void setEntityId(String entityId) {
+      this.entityId = entityId;
+    }
+
+    /**
+     * Get the entity type
+     * 
+     * @return the entity type
+     */
+    @XmlElement(name = "entitytype")
+    public String getEntityType() {
+      return entityType;
+    }
+
+    /**
+     * Set the entity type
+     * 
+     * @param entityType
+     *          the entity type
+     */
+    public void setEntityType(String entityType) {
+      this.entityType = entityType;
+    }
+
+    /**
+     * Get the error code
+     * 
+     * @return an error code
+     */
+    @XmlElement(name = "errorcode")
+    public int getErrorCode() {
+      return errorCode;
+    }
+
+    /**
+     * Set the error code to the given error code
+     * 
+     * @param errorCode
+     *          an error code
+     */
+    public void setErrorCode(int errorCode) {
+      this.errorCode = errorCode;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b68d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 25b808e..57fc378 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1253,6 +1253,8 @@ public class YarnConfiguration extends Configuration {
   public static final String TIMELINE_SERVICE_PREFIX =
       YARN_PREFIX + "timeline-service.";
 
+  public static final String TIMELINE_SERVICE_WRITER_CLASS =
+      TIMELINE_SERVICE_PREFIX + "writer.class";
 
   // mark app-history related configs @Private as application history is going
   // to be integrated into the timeline service

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b68d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index 313dc97..ef69e4b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.JarFinder;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
@@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -237,6 +239,7 @@ public class TestDistributedShell {
 
     boolean verified = false;
     String errorMessage = "";
+    ApplicationId appId = null;
     while(!verified) {
       List<ApplicationReport> apps = yarnClient.getApplications();
       if (apps.size() == 0 ) {
@@ -244,6 +247,7 @@ public class TestDistributedShell {
         continue;
       }
       ApplicationReport appReport = apps.get(0);
+      appId = appReport.getApplicationId();
       if(appReport.getHost().equals("N/A")) {
         Thread.sleep(10);
         continue;
@@ -267,7 +271,7 @@ public class TestDistributedShell {
     if (!isTestingTimelineV2) {
       checkTimelineV1(haveDomain);
     } else {
-      checkTimelineV2(haveDomain);
+      checkTimelineV2(haveDomain, appId);
     }
   }
 
@@ -316,8 +320,54 @@ public class TestDistributedShell {
     }
   }
 
-  private void checkTimelineV2(boolean haveDomain) {
-    // TODO check timeline V2 here after we have a storage layer
+  private void checkTimelineV2(boolean haveDomain, ApplicationId appId) {
+    // For PoC check in /tmp/ YARN-3264
+    String tmpRoot = FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
+
+    File tmpRootFolder = new File(tmpRoot);
+    Assert.assertTrue(tmpRootFolder.isDirectory());
+
+    // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
+    String outputDirApp = tmpRoot + "/DS_APP_ATTEMPT/";
+
+    File entityFolder = new File(outputDirApp);
+    Assert.assertTrue(entityFolder.isDirectory());
+
+    // there will be at least one attempt, look for that file
+    String appTimestampFileName = "appattempt_" + appId.getClusterTimestamp()
+        + "_000" + appId.getId() + "_000001"
+        + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+    String appAttemptFileName = outputDirApp + appTimestampFileName;
+    File appAttemptFile = new File(appAttemptFileName);
+    Assert.assertTrue(appAttemptFile.exists());
+
+    String outputDirContainer = tmpRoot + "/DS_CONTAINER/";
+    File containerFolder = new File(outputDirContainer);
+    Assert.assertTrue(containerFolder.isDirectory());
+
+    String containerTimestampFileName = "container_"
+        + appId.getClusterTimestamp() + "_000" + appId.getId()
+        + "_01_000002.thist";
+    String containerFileName = outputDirContainer + containerTimestampFileName;
+    File containerFile = new File(containerFileName);
+    Assert.assertTrue(containerFile.exists());
+    String appTimeStamp = appId.getClusterTimestamp() + "_" + appId.getId()
+        + "_";
+    deleteAppFiles(new File(outputDirApp), appTimeStamp);
+    deleteAppFiles(new File(outputDirContainer), appTimeStamp);
+    tmpRootFolder.delete();
+  }
+
+  private void deleteAppFiles(File rootDir, String appTimeStamp) {
+    boolean deleted = false;
+    File[] listOfFiles = rootDir.listFiles();
+    for (File f1 : listOfFiles) {
+      // list all attempts for this app and delete them
+      if (f1.getName().contains(appTimeStamp)){
+        deleted = f1.delete();
+        Assert.assertTrue(deleted);
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b68d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
index 4227712..dbd0895 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TimelineAggregator.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.aggregator;
 
+import java.io.IOException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -26,12 +28,16 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.util.ReflectionUtils;
 /**
  * Service that handles writes to the timeline service and writes them to the
  * backing storage.
  *
- * Classes that extend this can putIfAbsent their own lifecycle management or
+ * Classes that extend this can add their own lifecycle management or
  * customization of request handling.
  */
 @Private
@@ -39,6 +45,8 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 public abstract class TimelineAggregator extends CompositeService {
   private static final Log LOG = LogFactory.getLog(TimelineAggregator.class);
 
+  private TimelineWriter writer;
+
   public TimelineAggregator(String name) {
     super(name);
   }
@@ -46,6 +54,11 @@ public abstract class TimelineAggregator extends CompositeService {
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     super.serviceInit(conf);
+    writer = ReflectionUtils.newInstance(conf.getClass(
+        YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+        FileSystemTimelineWriterImpl.class,
+        TimelineWriter.class), conf);
+    writer.init(conf);
   }
 
   @Override
@@ -56,6 +69,11 @@ public abstract class TimelineAggregator extends CompositeService {
   @Override
   protected void serviceStop() throws Exception {
     super.serviceStop();
+    writer.stop();
+  }
+
+  public TimelineWriter getWriter() {
+    return writer;
   }
 
   /**
@@ -69,20 +87,17 @@ public abstract class TimelineAggregator extends CompositeService {
    *
    * @param entities entities to post
    * @param callerUgi the caller UGI
+   * @return the response that contains the result of the post.
    */
-  public void postEntities(TimelineEntities entities,
-      UserGroupInformation callerUgi) {
-    // Add this output temporarily for our prototype
-    // TODO remove this after we have an actual implementation
-    LOG.info("SUCCESS - TIMELINE V2 PROTOTYPE");
-    LOG.info("postEntities(entities=" + entities + ", callerUgi=" +
-        callerUgi + ")");
-
-    // TODO implement
+  public TimelineWriteResponse postEntities(TimelineEntities entities,
+      UserGroupInformation callerUgi) throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("postEntities(entities=" + entities + ", callerUgi=" +
-          callerUgi + ")");
+      LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE");
+      LOG.debug("postEntities(entities=" + entities + ", callerUgi="
+          + callerUgi + ")");
     }
+
+    return writer.write(entities);
   }
 
   /**
@@ -104,4 +119,4 @@ public abstract class TimelineAggregator extends CompositeService {
           callerUgi + ")");
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b68d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
new file mode 100644
index 0000000..4a57e97
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse.TimelineWriteError;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+/**
+ * This implements a local file based backend for storing application timeline
+ * information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class FileSystemTimelineWriterImpl extends AbstractService
+    implements TimelineWriter {
+
+  private String outputRoot;
+
+  /** Config param for timeline service storage tmp root for FILE YARN-3264 */
+  public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT
+    = YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
+
+  /** default value for storage location on local disk */
+  public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
+    = "/tmp/timeline_service_data/";
+
+  /** Default extension for output files */
+  public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
+
+  FileSystemTimelineWriterImpl() {
+    super((FileSystemTimelineWriterImpl.class.getName()));
+  }
+
+  /**
+   * Stores the entire information in {@link TimelineEntity} to the
+   * timeline store. Any errors occurring for individual write request objects
+   * will be reported in the response.
+   * 
+   * @param data
+   *          a {@link TimelineEntity} object
+   * @return {@link TimelineWriteResponse} object.
+   * @throws IOException
+   */
+  @Override
+  public TimelineWriteResponse write(TimelineEntities entities)
+      throws IOException {
+    TimelineWriteResponse response = new TimelineWriteResponse();
+    for (TimelineEntity entity : entities.getEntities()) {
+      write(entity, response);
+    }
+    return response;
+  }
+
+  private void write(TimelineEntity entity,
+      TimelineWriteResponse response) throws IOException {
+    PrintWriter out = null;
+    try {
+      File outputDir = new File(outputRoot + entity.getType());
+      String fileName = outputDir + "/" + entity.getId()
+          + TIMELINE_SERVICE_STORAGE_EXTENSION;
+      if (!outputDir.exists()) {
+        if (!outputDir.mkdirs()) {
+          throw new IOException("Could not create directories for " + fileName);
+        }
+      }
+      out = new PrintWriter(new BufferedWriter(new FileWriter(fileName, true)));
+      out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity));
+      out.write("\n");
+    } catch (IOException ioe) {
+      TimelineWriteError error = new TimelineWriteError();
+      error.setEntityId(entity.getId());
+      error.setEntityType(entity.getType());
+      /*
+       * TODO: set an appropriate error code after PoC could possibly be:
+       * error.setErrorCode(TimelineWriteError.IO_EXCEPTION);
+       */
+      response.addError(error);
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+  }
+
+  /**
+   * Aggregates the entity information to the timeline store based on which
+   * track this entity is to be rolled up to The tracks along which aggregations
+   * are to be done are given by {@link TimelineAggregationTrack}
+   * 
+   * Any errors occurring for individual write request objects will be reported
+   * in the response.
+   * 
+   * @param data
+   *          a {@link TimelineEntity} object
+   *          a {@link TimelineAggregationTrack} enum value
+   * @return a {@link TimelineWriteResponse} object.
+   * @throws IOException
+   */
+  public TimelineWriteResponse aggregate(TimelineEntity data,
+      TimelineAggregationTrack track) throws IOException {
+    return null;
+
+  }
+
+  public String getOutputRoot() {
+    return outputRoot;
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+        DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b68d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java
new file mode 100644
index 0000000..955ca80
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineAggregationTrack.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+/**
+ * specifies the tracks along which an entity
+ * info is to be aggregated on
+ *
+ */
+public enum TimelineAggregationTrack {
+  FLOW, USER, QUEUE
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b68d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
new file mode 100644
index 0000000..71ad7ab
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.service.Service;
+
+/**
+ * This interface is for storing application timeline information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface TimelineWriter extends Service {
+
+  /**
+   * Stores the entire information in {@link TimelineEntities} to the
+   * timeline store. Any errors occurring for individual write request objects
+   * will be reported in the response.
+   *
+   * @param data
+   *          a {@link TimelineEntities} object.
+   * @return a {@link TimelineWriteResponse} object.
+   * @throws IOException
+   */
+  TimelineWriteResponse write(TimelineEntities data) throws IOException;
+
+  /**
+   * Aggregates the entity information to the timeline store based on which
+   * track this entity is to be rolled up to The tracks along which aggregations
+   * are to be done are given by {@link TimelineAggregationTrack}
+   *
+   * Any errors occurring for individual write request objects will be reported
+   * in the response.
+   *
+   * @param data
+   *          a {@link TimelineEntity} object
+   *          a {@link TimelineAggregationTrack} enum
+   *          value.
+   * @return a {@link TimelineWriteResponse} object.
+   * @throws IOException
+   */
+  TimelineWriteResponse aggregate(TimelineEntity data,
+      TimelineAggregationTrack track) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b68d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java
deleted file mode 100644
index 821e455..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-public class TestTimelineAggregator {
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b68d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
new file mode 100644
index 0000000..f720454
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.junit.Test;
+import org.apache.commons.io.FileUtils;
+
+public class TestFileSystemTimelineWriterImpl {
+
+  /**
+   * Unit test for PoC YARN 3264
+   * @throws Exception
+   */
+  @Test
+  public void testWriteEntityToFile() throws Exception {
+    String name =  "unit_test_BaseAggregator_testWriteEntityToFile_"
+        + Long.toString(System.currentTimeMillis());
+
+    TimelineEntities te = new TimelineEntities();
+    TimelineEntity entity = new TimelineEntity();
+    String id = "hello";
+    String type = "world";
+    entity.setId(id);
+    entity.setType(type);
+    entity.setCreatedTime(1425016501000L);
+    entity.setModifiedTime(1425016502000L);
+    te.addEntity(entity);
+
+    FileSystemTimelineWriterImpl fsi = new FileSystemTimelineWriterImpl();
+    fsi.serviceInit(new Configuration());
+    fsi.write(te);
+
+    String fileName = fsi.getOutputRoot() + "/" + type + "/" + id
+        + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+    Path path = Paths.get(fileName);
+    File f = new File(fileName);
+    assertTrue(f.exists() && !f.isDirectory());
+    List<String> data = Files.readAllLines(path, StandardCharsets.UTF_8);
+    // ensure there's only one entity + 1 new line
+    assertTrue(data.size() == 2);
+    String d = data.get(0);
+    // confirm the contents same as what was written
+    assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
+
+    // delete the directory
+    File outputDir = new File(fsi.getOutputRoot());
+    FileUtils.deleteDirectory(outputDir);
+    assertTrue(!(f.exists()));
+  }
+}


Mime
View raw message