hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From junping...@apache.org
Subject [1/2] hadoop git commit: YARN-4265. Provide new timeline plugin storage to support fine-grained entity caching. Contributed by Li Lu and Jason Lowe
Date Mon, 18 Jan 2016 01:28:58 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk da77f423d -> 02f597c5d


http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java
new file mode 100644
index 0000000..71e26cb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/EntityGroupPlugInForTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timeline;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.SortedSet;
+
+class EntityGroupPlugInForTest extends TimelineEntityGroupPlugin {
+
+  private static TimelineEntityGroupId timelineEntityGroupId
+      = TimelineEntityGroupId.newInstance(
+      TestEntityGroupFSTimelineStore.TEST_APPLICATION_ID, "test");
+
+  @Override
+  public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
+      NameValuePair primaryFilter,
+      Collection<NameValuePair> secondaryFilters) {
+    return Sets.newHashSet(timelineEntityGroupId);
+  }
+
+  @Override
+  public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityId,
+      String entityType) {
+    return Sets.newHashSet(timelineEntityGroupId);
+  }
+
+  @Override
+  public Set<TimelineEntityGroupId> getTimelineEntityGroupId(String entityType,
+      SortedSet<String> entityIds,
+      Set<String> eventTypes) {
+    return Sets.newHashSet(timelineEntityGroupId);
+  }
+
+  static TimelineEntityGroupId getStandardTimelineGroupId() {
+    return timelineEntityGroupId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java
new file mode 100644
index 0000000..e0379b1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/PluginStoreTestUtils.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.util.MinimalPrettyPrinter;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+class PluginStoreTestUtils {
+
+  static FSDataOutputStream createLogFile(Path logPath, FileSystem fs)
+      throws IOException {
+    FSDataOutputStream stream;
+    stream = fs.create(logPath, true);
+    return stream;
+  }
+
+  static ObjectMapper createObjectMapper() {
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector());
+    mapper.setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+    mapper.configure(SerializationConfig.Feature.CLOSE_CLOSEABLE, false);
+    return mapper;
+  }
+
+  /**
+   * Create sample entities for testing
+   * @return two timeline entities in a {@link TimelineEntities} object
+   */
+  static TimelineEntities generateTestEntities() {
+    TimelineEntities entities = new TimelineEntities();
+    Map<String, Set<Object>> primaryFilters =
+        new HashMap<String, Set<Object>>();
+    Set<Object> l1 = new HashSet<Object>();
+    l1.add("username");
+    Set<Object> l2 = new HashSet<Object>();
+    l2.add(Integer.MAX_VALUE);
+    Set<Object> l3 = new HashSet<Object>();
+    l3.add("123abc");
+    Set<Object> l4 = new HashSet<Object>();
+    l4.add((long)Integer.MAX_VALUE + 1l);
+    primaryFilters.put("user", l1);
+    primaryFilters.put("appname", l2);
+    primaryFilters.put("other", l3);
+    primaryFilters.put("long", l4);
+    Map<String, Object> secondaryFilters = new HashMap<String, Object>();
+    secondaryFilters.put("startTime", 123456);
+    secondaryFilters.put("status", "RUNNING");
+    Map<String, Object> otherInfo1 = new HashMap<String, Object>();
+    otherInfo1.put("info1", "val1");
+    otherInfo1.putAll(secondaryFilters);
+
+    String entityId1 = "id_1";
+    String entityType1 = "type_1";
+    String entityId2 = "id_2";
+    String entityType2 = "type_2";
+
+    Map<String, Set<String>> relatedEntities =
+        new HashMap<String, Set<String>>();
+    relatedEntities.put(entityType2, Collections.singleton(entityId2));
+
+    TimelineEvent ev3 = createEvent(789l, "launch_event", null);
+    TimelineEvent ev4 = createEvent(0l, "init_event", null);
+    List<TimelineEvent> events = new ArrayList<TimelineEvent>();
+    events.add(ev3);
+    events.add(ev4);
+    entities.addEntity(createEntity(entityId2, entityType2, 456l, events, null,
+        null, null, "domain_id_1"));
+
+    TimelineEvent ev1 = createEvent(123l, "start_event", null);
+    entities.addEntity(createEntity(entityId1, entityType1, 123l,
+        Collections.singletonList(ev1), relatedEntities, primaryFilters,
+        otherInfo1, "domain_id_1"));
+    return entities;
+  }
+
+  static void verifyTestEntities(TimelineDataManager tdm)
+      throws YarnException, IOException {
+    TimelineEntity entity1 = tdm.getEntity("type_1", "id_1",
+        EnumSet.allOf(TimelineReader.Field.class),
+        UserGroupInformation.getLoginUser());
+    TimelineEntity entity2 = tdm.getEntity("type_2", "id_2",
+        EnumSet.allOf(TimelineReader.Field.class),
+        UserGroupInformation.getLoginUser());
+    assertNotNull(entity1);
+    assertNotNull(entity2);
+    assertEquals("Failed to read out entity 1",
+        (Long) 123l, entity1.getStartTime());
+    assertEquals("Failed to read out entity 2",
+        (Long) 456l, entity2.getStartTime());
+  }
+
+  /**
+   * Create a test entity
+   */
+  static TimelineEntity createEntity(String entityId, String entityType,
+      Long startTime, List<TimelineEvent> events,
+      Map<String, Set<String>> relatedEntities,
+      Map<String, Set<Object>> primaryFilters,
+      Map<String, Object> otherInfo, String domainId) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setEntityId(entityId);
+    entity.setEntityType(entityType);
+    entity.setStartTime(startTime);
+    entity.setEvents(events);
+    if (relatedEntities != null) {
+      for (Map.Entry<String, Set<String>> e : relatedEntities.entrySet()) {
+        for (String v : e.getValue()) {
+          entity.addRelatedEntity(e.getKey(), v);
+        }
+      }
+    } else {
+      entity.setRelatedEntities(null);
+    }
+    entity.setPrimaryFilters(primaryFilters);
+    entity.setOtherInfo(otherInfo);
+    entity.setDomainId(domainId);
+    return entity;
+  }
+
+  /**
+   * Create a test event
+   */
+  static TimelineEvent createEvent(long timestamp, String type, Map<String,
+      Object> info) {
+    TimelineEvent event = new TimelineEvent();
+    event.setTimestamp(timestamp);
+    event.setEventType(type);
+    event.setEventInfo(info);
+    return event;
+  }
+
+  /**
+   * Write timeline entities to a file system
+   * @param entities
+   * @param logPath
+   * @param fs
+   * @throws IOException
+   */
+  static void writeEntities(TimelineEntities entities, Path logPath,
+      FileSystem fs) throws IOException {
+    FSDataOutputStream outStream = createLogFile(logPath, fs);
+    JsonGenerator jsonGenerator
+        = (new JsonFactory()).createJsonGenerator(outStream);
+    jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
+    ObjectMapper objMapper = createObjectMapper();
+    for (TimelineEntity entity : entities.getEntities()) {
+      objMapper.writeValue(jsonGenerator, entity);
+    }
+    outStream.close();
+  }
+
+  static TimelineDataManager getTdmWithStore(Configuration config, TimelineStore store) {
+    TimelineACLsManager aclManager = new TimelineACLsManager(config);
+    TimelineDataManager tdm = new TimelineDataManager(store, aclManager);
+    tdm.init(config);
+    return tdm;
+  }
+
+  static TimelineDataManager getTdmWithMemStore(Configuration config) {
+    TimelineStore store = new MemoryTimelineStore();
+    TimelineDataManager tdm = getTdmWithStore(config, store);
+    return tdm;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
new file mode 100644
index 0000000..e43b705
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+
+import static org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils {
+
+  private static final String SAMPLE_APP_NAME = "1234_5678";
+
+  static final ApplicationId TEST_APPLICATION_ID
+      = ConverterUtils.toApplicationId(
+      ConverterUtils.APPLICATION_PREFIX + "_" + SAMPLE_APP_NAME);
+
+  private static final String TEST_APP_DIR_NAME
+      = TEST_APPLICATION_ID.toString();
+  private static final String TEST_ATTEMPT_DIR_NAME
+      = ApplicationAttemptId.appAttemptIdStrPrefix + SAMPLE_APP_NAME + "_1";
+  private static final String TEST_SUMMARY_LOG_FILE_NAME
+      = EntityGroupFSTimelineStore.SUMMARY_LOG_PREFIX + "test";
+  private static final String TEST_ENTITY_LOG_FILE_NAME
+      = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
+          + EntityGroupPlugInForTest.getStandardTimelineGroupId();
+  private static final String TEST_DOMAIN_LOG_FILE_NAME
+      = EntityGroupFSTimelineStore.DOMAIN_LOG_PREFIX + "test";
+
+  private static final Path TEST_ROOT_DIR
+      = new Path(System.getProperty("test.build.data",
+          System.getProperty("java.io.tmpdir")),
+      TestEntityGroupFSTimelineStore.class.getSimpleName());
+  private static final Path TEST_APP_DIR_PATH
+      = new Path(TEST_ROOT_DIR, TEST_APP_DIR_NAME);
+  private static final Path TEST_ATTEMPT_DIR_PATH
+      = new Path(TEST_APP_DIR_PATH, TEST_ATTEMPT_DIR_NAME);
+  private static final Path TEST_DONE_DIR_PATH
+      = new Path(TEST_ROOT_DIR, "done");
+
+  private static Configuration config = new YarnConfiguration();
+  private static MiniDFSCluster hdfsCluster;
+  private static FileSystem fs;
+  private EntityGroupFSTimelineStore store;
+  private TimelineEntity entityNew;
+
+  @Rule
+  public TestName currTestName = new TestName();
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
+    config.set(
+        YarnConfiguration
+            .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_ENTITY_TYPES,
+        "YARN_APPLICATION,YARN_APPLICATION_ATTEMPT,YARN_CONTAINER");
+    config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
+        TEST_DONE_DIR_PATH.toString());
+    config.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString());
+    HdfsConfiguration hdfsConfig = new HdfsConfiguration();
+    hdfsCluster
+        = new MiniDFSCluster.Builder(hdfsConfig).numDataNodes(1).build();
+    fs = hdfsCluster.getFileSystem();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    createTestFiles();
+    store = new EntityGroupFSTimelineStore();
+    if (currTestName.getMethodName().contains("Plugin")) {
+      config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
+          EntityGroupPlugInForTest.class.getName());
+    }
+    store.init(config);
+    store.start();
+    store.setFs(fs);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    fs.delete(TEST_APP_DIR_PATH, true);
+    store.stop();
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    hdfsCluster.shutdown();
+    FileContext fileContext = FileContext.getLocalFSFileContext();
+    fileContext.delete(new Path(
+        config.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)), true);
+  }
+
+  @Test
+  public void testAppLogsScanLogs() throws Exception {
+    EntityGroupFSTimelineStore.AppLogs appLogs =
+        store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH,
+        AppState.COMPLETED);
+    appLogs.scanForLogs();
+    List<LogInfo> summaryLogs = appLogs.getSummaryLogs();
+    List<LogInfo> detailLogs = appLogs.getDetailLogs();
+    assertEquals(2, summaryLogs.size());
+    assertEquals(1, detailLogs.size());
+
+    for (LogInfo log : summaryLogs) {
+      String fileName = log.getFilename();
+      assertTrue(fileName.equals(TEST_SUMMARY_LOG_FILE_NAME)
+          || fileName.equals(TEST_DOMAIN_LOG_FILE_NAME));
+    }
+
+    for (LogInfo log : detailLogs) {
+      String fileName = log.getFilename();
+      assertEquals(fileName, TEST_ENTITY_LOG_FILE_NAME);
+    }
+  }
+
+  @Test
+  public void testMoveToDone() throws Exception {
+    EntityGroupFSTimelineStore.AppLogs appLogs =
+        store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH,
+        AppState.COMPLETED);
+    Path pathBefore = appLogs.getAppDirPath();
+    appLogs.moveToDone();
+    Path pathAfter = appLogs.getAppDirPath();
+    assertNotEquals(pathBefore, pathAfter);
+    assertTrue(pathAfter.toString().contains(TEST_DONE_DIR_PATH.toString()));
+  }
+
+  @Test
+  public void testParseSummaryLogs() throws Exception {
+    TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
+    EntityGroupFSTimelineStore.AppLogs appLogs =
+        store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH,
+        AppState.COMPLETED);
+    appLogs.scanForLogs();
+    appLogs.parseSummaryLogs(tdm);
+    PluginStoreTestUtils.verifyTestEntities(tdm);
+  }
+
+  @Test
+  public void testCleanLogs() throws Exception {
+    // Create test dirs and files
+    // Irrelevant file, should not be reclaimed
+    Path irrelevantFilePath = new Path(
+        TEST_DONE_DIR_PATH, "irrelevant.log");
+    FSDataOutputStream stream = fs.create(irrelevantFilePath);
+    stream.close();
+    // Irrelevant directory, should not be reclaimed
+    Path irrelevantDirPath = new Path(TEST_DONE_DIR_PATH, "irrelevant");
+    fs.mkdirs(irrelevantDirPath);
+
+    Path doneAppHomeDir = new Path(new Path(TEST_DONE_DIR_PATH, "0000"), "001");
+    // First application, untouched after creation
+    Path appDirClean = new Path(doneAppHomeDir, TEST_APP_DIR_NAME);
+    Path attemptDirClean = new Path(appDirClean, TEST_ATTEMPT_DIR_NAME);
+    fs.mkdirs(attemptDirClean);
+    Path filePath = new Path(attemptDirClean, "test.log");
+    stream = fs.create(filePath);
+    stream.close();
+    // Second application, one file touched after creation
+    Path appDirHoldByFile = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "1");
+    Path attemptDirHoldByFile
+        = new Path(appDirHoldByFile, TEST_ATTEMPT_DIR_NAME);
+    fs.mkdirs(attemptDirHoldByFile);
+    Path filePathHold = new Path(attemptDirHoldByFile, "test1.log");
+    stream = fs.create(filePathHold);
+    stream.close();
+    // Third application, one dir touched after creation
+    Path appDirHoldByDir = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "2");
+    Path attemptDirHoldByDir = new Path(appDirHoldByDir, TEST_ATTEMPT_DIR_NAME);
+    fs.mkdirs(attemptDirHoldByDir);
+    Path dirPathHold = new Path(attemptDirHoldByDir, "hold");
+    fs.mkdirs(dirPathHold);
+    // Fourth application, empty dirs
+    Path appDirEmpty = new Path(doneAppHomeDir, TEST_APP_DIR_NAME + "3");
+    Path attemptDirEmpty = new Path(appDirEmpty, TEST_ATTEMPT_DIR_NAME);
+    fs.mkdirs(attemptDirEmpty);
+    Path dirPathEmpty = new Path(attemptDirEmpty, "empty");
+    fs.mkdirs(dirPathEmpty);
+
+    // Should retain all logs after this run
+    EntityGroupFSTimelineStore.cleanLogs(TEST_DONE_DIR_PATH, fs, 10000);
+    assertTrue(fs.exists(irrelevantDirPath));
+    assertTrue(fs.exists(irrelevantFilePath));
+    assertTrue(fs.exists(filePath));
+    assertTrue(fs.exists(filePathHold));
+    assertTrue(fs.exists(dirPathHold));
+    assertTrue(fs.exists(dirPathEmpty));
+
+    // Make sure the created dir is old enough
+    Thread.sleep(2000);
+    // Touch the second application
+    stream = fs.append(filePathHold);
+    stream.writeBytes("append");
+    stream.close();
+    // Touch the third application by creating a new dir
+    fs.mkdirs(new Path(dirPathHold, "holdByMe"));
+
+    EntityGroupFSTimelineStore.cleanLogs(TEST_DONE_DIR_PATH, fs, 1000);
+
+    // Verification after the second cleaner call
+    assertTrue(fs.exists(irrelevantDirPath));
+    assertTrue(fs.exists(irrelevantFilePath));
+    assertTrue(fs.exists(filePathHold));
+    assertTrue(fs.exists(dirPathHold));
+    assertTrue(fs.exists(doneAppHomeDir));
+
+    // appDirClean and appDirEmpty should be cleaned up
+    assertFalse(fs.exists(appDirClean));
+    assertFalse(fs.exists(appDirEmpty));
+  }
+
+  @Test
+  public void testPluginRead() throws Exception {
+    // Verify precondition
+    assertEquals(EntityGroupPlugInForTest.class.getName(),
+        store.getConfig().get(
+            YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES));
+    // Load data and cache item, prepare timeline store by making a cache item
+    EntityGroupFSTimelineStore.AppLogs appLogs =
+        store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH,
+        AppState.COMPLETED);
+    EntityCacheItem cacheItem = new EntityCacheItem(config, fs);
+    cacheItem.setAppLogs(appLogs);
+    store.setCachedLogs(
+        EntityGroupPlugInForTest.getStandardTimelineGroupId(), cacheItem);
+    // Generate TDM
+    TimelineDataManager tdm
+        = PluginStoreTestUtils.getTdmWithStore(config, store);
+
+    // Verify single entity read
+    TimelineEntity entity3 = tdm.getEntity("type_3", "id_3",
+        EnumSet.allOf(TimelineReader.Field.class),
+        UserGroupInformation.getLoginUser());
+    assertNotNull(entity3);
+    assertEquals(entityNew.getStartTime(), entity3.getStartTime());
+    // Verify multiple entities read
+    TimelineEntities entities = tdm.getEntities("type_3", null, null, null,
+        null, null, null, null, EnumSet.allOf(TimelineReader.Field.class),
+        UserGroupInformation.getLoginUser());
+    assertEquals(entities.getEntities().size(), 1);
+    for (TimelineEntity entity : entities.getEntities()) {
+      assertEquals(entityNew.getStartTime(), entity.getStartTime());
+    }
+  }
+
+  @Test
+  public void testSummaryRead() throws Exception {
+    // Load data
+    EntityGroupFSTimelineStore.AppLogs appLogs =
+        store.new AppLogs(TEST_APPLICATION_ID, TEST_APP_DIR_PATH,
+        AppState.COMPLETED);
+    TimelineDataManager tdm
+        = PluginStoreTestUtils.getTdmWithStore(config, store);
+    appLogs.scanForLogs();
+    appLogs.parseSummaryLogs(tdm);
+
+    // Verify single entity read
+    PluginStoreTestUtils.verifyTestEntities(tdm);
+    // Verify multiple entities read
+    TimelineEntities entities = tdm.getEntities("type_1", null, null, null,
+        null, null, null, null, EnumSet.allOf(TimelineReader.Field.class),
+        UserGroupInformation.getLoginUser());
+    assertEquals(entities.getEntities().size(), 1);
+    for (TimelineEntity entity : entities.getEntities()) {
+      assertEquals((Long) 123l, entity.getStartTime());
+    }
+
+  }
+
+  private void createTestFiles() throws IOException {
+    TimelineEntities entities = PluginStoreTestUtils.generateTestEntities();
+    PluginStoreTestUtils.writeEntities(entities,
+        new Path(TEST_ATTEMPT_DIR_PATH, TEST_SUMMARY_LOG_FILE_NAME), fs);
+
+    entityNew = PluginStoreTestUtils
+        .createEntity("id_3", "type_3", 789l, null, null,
+            null, null, "domain_id_1");
+    TimelineEntities entityList = new TimelineEntities();
+    entityList.addEntity(entityNew);
+    PluginStoreTestUtils.writeEntities(entityList,
+        new Path(TEST_ATTEMPT_DIR_PATH, TEST_ENTITY_LOG_FILE_NAME), fs);
+
+    FSDataOutputStream out = fs.create(
+        new Path(TEST_ATTEMPT_DIR_PATH, TEST_DOMAIN_LOG_FILE_NAME));
+    out.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLogInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLogInfo.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLogInfo.java
new file mode 100644
index 0000000..fa6fcc7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLogInfo.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.util.MinimalPrettyPrinter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.EnumSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestLogInfo {
+
+  private static final Path TEST_ROOT_DIR = new Path(
+      System.getProperty("test.build.data",
+          System.getProperty("java.io.tmpdir")),
+      TestLogInfo.class.getSimpleName());
+
+  private static final String TEST_ATTEMPT_DIR_NAME = "test_app";
+  private static final String TEST_ENTITY_FILE_NAME = "test_entity";
+  private static final String TEST_DOMAIN_FILE_NAME = "test_domain";
+  private static final String TEST_BROKEN_FILE_NAME = "test_broken";
+
+  private Configuration config = new YarnConfiguration();
+  private MiniDFSCluster hdfsCluster;
+  private FileSystem fs;
+  private ObjectMapper objMapper;
+
+  private JsonFactory jsonFactory = new JsonFactory();
+  private JsonGenerator jsonGenerator;
+  private FSDataOutputStream outStream = null;
+  private FSDataOutputStream outStreamDomain = null;
+
+  private TimelineDomain testDomain;
+
+  private static final short FILE_LOG_DIR_PERMISSIONS = 0770;
+
+  @Before
+  public void setup() throws Exception {
+    config.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR.toString());
+    HdfsConfiguration hdfsConfig = new HdfsConfiguration();
+    hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig).numDataNodes(1).build();
+    fs = hdfsCluster.getFileSystem();
+    Path testAppDirPath = new Path(TEST_ROOT_DIR, TEST_ATTEMPT_DIR_NAME);
+    fs.mkdirs(testAppDirPath, new FsPermission(FILE_LOG_DIR_PERMISSIONS));
+    objMapper = PluginStoreTestUtils.createObjectMapper();
+
+    TimelineEntities testEntities = PluginStoreTestUtils.generateTestEntities();
+    writeEntitiesLeaveOpen(testEntities,
+        new Path(testAppDirPath, TEST_ENTITY_FILE_NAME));
+
+    testDomain = new TimelineDomain();
+    testDomain.setId("domain_1");
+    testDomain.setReaders(UserGroupInformation.getLoginUser().getUserName());
+    testDomain.setOwner(UserGroupInformation.getLoginUser().getUserName());
+    testDomain.setDescription("description");
+    writeDomainLeaveOpen(testDomain,
+        new Path(testAppDirPath, TEST_DOMAIN_FILE_NAME));
+
+    writeBrokenFile(new Path(testAppDirPath, TEST_BROKEN_FILE_NAME));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    jsonGenerator.close();
+    outStream.close();
+    outStreamDomain.close();
+    hdfsCluster.shutdown();
+  }
+
+  @Test
+  public void testMatchesGroupId() throws Exception {
+    String testGroupId = "app1_group1";
+    // Match
+    EntityLogInfo testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME,
+        "app1_group1",
+        UserGroupInformation.getLoginUser().getUserName());
+    assertTrue(testLogInfo.matchesGroupId(testGroupId));
+    testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME,
+        "test_app1_group1",
+        UserGroupInformation.getLoginUser().getUserName());
+    assertTrue(testLogInfo.matchesGroupId(testGroupId));
+    // Unmatch
+    testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app2_group1",
+        UserGroupInformation.getLoginUser().getUserName());
+    assertFalse(testLogInfo.matchesGroupId(testGroupId));
+    testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group2",
+        UserGroupInformation.getLoginUser().getUserName());
+    assertFalse(testLogInfo.matchesGroupId(testGroupId));
+    testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group12",
+        UserGroupInformation.getLoginUser().getUserName());
+    assertFalse(testLogInfo.matchesGroupId(testGroupId));
+    // Check delimiters
+    testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group1_2",
+        UserGroupInformation.getLoginUser().getUserName());
+    assertTrue(testLogInfo.matchesGroupId(testGroupId));
+    testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app1_group1.dat",
+        UserGroupInformation.getLoginUser().getUserName());
+    assertTrue(testLogInfo.matchesGroupId(testGroupId));
+    // Check file names shorter than group id
+    testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME, "app2",
+        UserGroupInformation.getLoginUser().getUserName());
+    assertFalse(testLogInfo.matchesGroupId(testGroupId));
+  }
+
+  @Test
+  public void testParseEntity() throws Exception {
+    // Load test data
+    TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
+    EntityLogInfo testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME,
+        TEST_ENTITY_FILE_NAME,
+        UserGroupInformation.getLoginUser().getUserName());
+    testLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper,
+        fs);
+    // Verify for the first batch
+    PluginStoreTestUtils.verifyTestEntities(tdm);
+    // Load new data
+    TimelineEntity entityNew = PluginStoreTestUtils
+        .createEntity("id_3", "type_3", 789l, null, null,
+            null, null, "domain_id_1");
+    TimelineEntities entityList = new TimelineEntities();
+    entityList.addEntity(entityNew);
+    writeEntitiesLeaveOpen(entityList,
+        new Path(new Path(TEST_ROOT_DIR, TEST_ATTEMPT_DIR_NAME),
+            TEST_ENTITY_FILE_NAME));
+    testLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper,
+        fs);
+    // Verify the newly added data
+    TimelineEntity entity3 = tdm.getEntity(entityNew.getEntityType(),
+        entityNew.getEntityId(), EnumSet.allOf(TimelineReader.Field.class),
+        UserGroupInformation.getLoginUser());
+    assertNotNull(entity3);
+    assertEquals("Failed to read out entity new",
+        entityNew.getStartTime(), entity3.getStartTime());
+    tdm.close();
+  }
+
+  @Test
+  public void testParseBrokenEntity() throws Exception {
+    // Load test data
+    TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
+    EntityLogInfo testLogInfo = new EntityLogInfo(TEST_ATTEMPT_DIR_NAME,
+        TEST_BROKEN_FILE_NAME,
+        UserGroupInformation.getLoginUser().getUserName());
+    DomainLogInfo domainLogInfo = new DomainLogInfo(TEST_ATTEMPT_DIR_NAME,
+        TEST_BROKEN_FILE_NAME,
+        UserGroupInformation.getLoginUser().getUserName());
+    // Try parse, should not fail
+    testLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper,
+        fs);
+    domainLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper,
+        fs);
+    tdm.close();
+  }
+
+  @Test
+  public void testParseDomain() throws Exception {
+    // Load test data
+    TimelineDataManager tdm = PluginStoreTestUtils.getTdmWithMemStore(config);
+    DomainLogInfo domainLogInfo = new DomainLogInfo(TEST_ATTEMPT_DIR_NAME,
+        TEST_DOMAIN_FILE_NAME,
+        UserGroupInformation.getLoginUser().getUserName());
+    domainLogInfo.parseForStore(tdm, TEST_ROOT_DIR, true, jsonFactory, objMapper,
+        fs);
+    // Verify domain data
+    TimelineDomain resultDomain = tdm.getDomain("domain_1",
+        UserGroupInformation.getLoginUser());
+    assertNotNull(resultDomain);
+    assertEquals(testDomain.getReaders(), resultDomain.getReaders());
+    assertEquals(testDomain.getOwner(), resultDomain.getOwner());
+    assertEquals(testDomain.getDescription(), resultDomain.getDescription());
+  }
+
+  private void writeBrokenFile(Path logPath) throws IOException {
+    FSDataOutputStream out = null;
+    try {
+      String broken = "{ broken { [[]} broken";
+      out = PluginStoreTestUtils.createLogFile(logPath, fs);
+      out.write(broken.getBytes(Charset.forName("UTF-8")));
+      out.close();
+      out = null;
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+  }
+
+  // TestLogInfo needs to maintain opened hdfs files so we have to build our own
+  // write methods
+  private void writeEntitiesLeaveOpen(TimelineEntities entities, Path logPath)
+      throws IOException {
+    if (outStream == null) {
+      outStream = PluginStoreTestUtils.createLogFile(logPath, fs);
+      jsonGenerator = (new JsonFactory()).createJsonGenerator(outStream);
+      jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
+    }
+    for (TimelineEntity entity : entities.getEntities()) {
+      objMapper.writeValue(jsonGenerator, entity);
+    }
+    outStream.hflush();
+  }
+
+  private void writeDomainLeaveOpen(TimelineDomain domain, Path logPath)
+      throws IOException {
+    if (outStreamDomain == null) {
+      outStreamDomain = PluginStoreTestUtils.createLogFile(logPath, fs);
+    }
+    // Write domain uses its own json generator to isolate from entity writers
+    JsonGenerator jsonGeneratorLocal
+        = (new JsonFactory()).createJsonGenerator(outStreamDomain);
+    jsonGeneratorLocal.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
+    objMapper.writeValue(jsonGeneratorLocal, domain);
+    outStreamDomain.hflush();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/02f597c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
index 0043115..b217ca4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml
@@ -42,5 +42,6 @@
     <module>hadoop-yarn-server-sharedcachemanager</module>
     <module>hadoop-yarn-server-tests</module>
     <module>hadoop-yarn-server-applicationhistoryservice</module>
+    <module>hadoop-yarn-server-timeline-pluginstorage</module>
   </modules>
 </project>


Mime
View raw message